001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.UUID;
023
024import org.apache.oozie.CoordinatorActionBean;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.FaultInjection;
028import org.apache.oozie.WorkflowActionBean;
029import org.apache.oozie.WorkflowJobBean;
030import org.apache.oozie.XException;
031import org.apache.oozie.service.CallableQueueService;
032import org.apache.oozie.service.DagXLogInfoService;
033import org.apache.oozie.service.InstrumentationService;
034import org.apache.oozie.service.MemoryLocksService;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.service.StoreService;
037import org.apache.oozie.service.XLogService;
038import org.apache.oozie.store.Store;
039import org.apache.oozie.store.StoreException;
040import org.apache.oozie.store.WorkflowStore;
041import org.apache.oozie.util.Instrumentation;
042import org.apache.oozie.util.ParamChecker;
043import org.apache.oozie.util.XCallable;
044import org.apache.oozie.util.XLog;
045import org.apache.oozie.util.MemoryLocks.LockToken;
046
047/**
048 * Base class for all synchronous and asynchronous DagEngine commands.
049 */
050public abstract class Command<T, S extends Store> implements XCallable<T> {
051    /**
052     * The instrumentation group used for Commands.
053     */
054    private static final String INSTRUMENTATION_GROUP = "commands";
055
056    private final long createdTime;
057
058    /**
059     * The instrumentation group used for Jobs.
060     */
061    private static final String INSTRUMENTATION_JOB_GROUP = "jobs";
062
063    private static final long LOCK_TIMEOUT = 1000;
064    protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000;
065
066    protected Instrumentation instrumentation;
067    private List<XCallable<Void>> callables;
068    private List<XCallable<Void>> delayedCallables;
069    private long delay = 0;
070    private List<XCallable<Void>> exceptionCallables;
071    private String name;
072    private String type;
073    private String key;
074    private int priority;
075    private int logMask;
076    private boolean withStore;
077    protected boolean dryrun = false;
078    private ArrayList<LockToken> locks = null;
079
080    /**
081     * This variable is package private for testing purposes only.
082     */
083    XLog.Info logInfo;
084
085    /**
086     * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are
087     * captured for execution.
088     *
089     * @param name command name.
090     * @param type command type.
091     * @param priority priority of the command, used when queuing for asynchronous execution.
092     * @param logMask log mask for the command logging calls.
093     */
094    public Command(String name, String type, int priority, int logMask) {
095        this(name, type, priority, logMask, true);
096    }
097
098    /**
099     * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
100     *
101     * @param name command name.
102     * @param type command type.
103     * @param priority priority of the command, used when queuing for asynchronous execution.
104     * @param logMask log mask for the command logging calls.
105     * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
106     */
107    public Command(String name, String type, int priority, int logMask, boolean withStore) {
108        this.name = ParamChecker.notEmpty(name, "name");
109        this.type = ParamChecker.notEmpty(type, "type");
110        this.key = name + "_" + UUID.randomUUID();
111        this.priority = priority;
112        this.withStore = withStore;
113        this.logMask = logMask;
114        instrumentation = Services.get().get(InstrumentationService.class).get();
115        logInfo = new XLog.Info(XLog.Info.get());
116        createdTime = System.currentTimeMillis();
117        locks = new ArrayList<LockToken>();
118    }
119
120    /**
121     * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
122     *
123     * @param name command name.
124     * @param type command type.
125     * @param priority priority of the command, used when queuing for asynchronous execution.
126     * @param logMask log mask for the command logging calls.
127     * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
128     * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without
129     * really submitting the job
130     */
131    public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
132        this(name, type, priority, logMask, withStore);
133        this.dryrun = dryrun;
134    }
135
136    /**
137     * Return the name of the command.
138     *
139     * @return the name of the command.
140     */
141    @Override
142    public String getName() {
143        return name;
144    }
145
146    /**
147     * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link
148     * org.apache.oozie.service.CallableQueueService}.
149     *
150     * @return the callable type.
151     */
152    @Override
153    public String getType() {
154        return type;
155    }
156
157    /**
158     * Return the priority of the command.
159     *
160     * @return the priority of the command.
161     */
162    @Override
163    public int getPriority() {
164        return priority;
165    }
166
167    /**
168     * Returns the createdTime of the callable in milliseconds
169     *
170     * @return the callable createdTime
171     */
172    @Override
173    public long getCreatedTime() {
174        return createdTime;
175    }
176
177    /**
178     * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is
179     * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a
180     * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link
181     * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands
182     * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed.
183     * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the
184     * commands queued for exception will be effectively queued fro execution..
185     *
186     * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed
187     * without committing, thus doing a rollback.
188     */
189    @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"})
190    public final T call() throws CommandException {
191        XLog.Info.get().setParameters(logInfo);
192        XLog log = XLog.getLog(getClass());
193        log.trace(logMask, "Start");
194        Instrumentation.Cron cron = new Instrumentation.Cron();
195        cron.start();
196        callables = new ArrayList<XCallable<Void>>();
197        delayedCallables = new ArrayList<XCallable<Void>>();
198        exceptionCallables = new ArrayList<XCallable<Void>>();
199        delay = 0;
200        S store = null;
201        boolean exception = false;
202
203        try {
204            if (withStore) {
205                store = (S) Services.get().get(StoreService.class).getStore(getStoreClass());
206                store.beginTrx();
207            }
208            T result = execute(store);
209            /*
210             *
211             * if (store != null && log != null) { log.info(XLog.STD,
212             * "connection log from store Flush Mode {0} ",
213             * store.getFlushMode()); }
214             */
215            if (withStore) {
216                if (store == null) {
217                    throw new IllegalStateException("WorkflowStore should not be null");
218                }
219                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
220                    throw new RuntimeException("Skipping Commit for Failover Testing");
221                }
222                store.commitTrx();
223            }
224
225            // TODO figure out the reject due to concurrency problems and remove
226            // the delayed queuing for callables.
227            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10);
228            if (ret == false) {
229                logQueueCallableFalse(callables);
230            }
231
232            ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay);
233            if (ret == false) {
234                logQueueCallableFalse(delayedCallables);
235            }
236
237            return result;
238        }
239        catch (XException ex) {
240            log.error(logMask | XLog.OPS, "XException, {0}", ex.getMessage(), ex);
241            if (store != null) {
242                log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store
243                        .isClosed());
244            }
245            exception = true;
246            if (store != null && store.isActive()) {
247                try {
248                    store.rollbackTrx();
249                }
250                catch (RuntimeException rex) {
251                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
252                }
253            }
254
255            // TODO figure out the reject due to concurrency problems and remove
256            // the delayed queuing for callables.
257            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10);
258            if (ret == false) {
259                logQueueCallableFalse(exceptionCallables);
260            }
261            if (ex instanceof CommandException) {
262                throw (CommandException) ex;
263            }
264            else {
265                throw new CommandException(ex);
266            }
267        }
268        catch (Exception ex) {
269            log.error(logMask | XLog.OPS, "Exception, {0}", ex.getMessage(), ex);
270            exception = true;
271            if (store != null && store.isActive()) {
272                try {
273                    store.rollbackTrx();
274                }
275                catch (RuntimeException rex) {
276                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
277                }
278            }
279            throw new CommandException(ErrorCode.E0607, name, ex.getMessage(), ex);
280        }
281        catch (Error er) {
282            log.error(logMask | XLog.OPS, "Error, {0}", er.getMessage(), er);
283            exception = true;
284            if (store != null && store.isActive()) {
285                try {
286                    store.rollbackTrx();
287                }
288                catch (RuntimeException rex) {
289                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
290                }
291            }
292            throw er;
293        }
294        finally {
295            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
296            cron.stop();
297            instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron);
298            incrCommandCounter(1);
299            log.trace(logMask, "End");
300            if (locks != null) {
301                for (LockToken lock : locks) {
302                    lock.release();
303                }
304                locks.clear();
305            }
306            if (store != null) {
307                if (!store.isActive()) {
308                    try {
309                        store.closeTrx();
310                    }
311                    catch (RuntimeException rex) {
312                        if (exception) {
313                            log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
314                        }
315                        else {
316                            throw rex;
317                        }
318                    }
319                }
320                else {
321                    log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager.");
322                }
323            }
324        }
325    }
326
327    /**
328     * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore}
329     * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a
330     * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they
331     * are not queued for execution.
332     *
333     * @param callable callable to queue for execution.
334     */
335    protected void queueCallable(XCallable<Void> callable) {
336        callables.add(callable);
337    }
338
339    /**
340     * Queue a list of callables for execution after the current callable call invocation completes and the {@link
341     * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are
342     * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are
343     * discarded, they are not queued for execution.
344     *
345     * @param callables list of callables to queue for execution.
346     */
347    protected void queueCallable(List<? extends XCallable<Void>> callables) {
348        this.callables.addAll(callables);
349    }
350
351    /**
352     * Queue a callable for delayed execution after the current callable call invocation completes and the {@link
353     * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue
354     * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables.
355     * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for
356     * execution.
357     *
358     * @param callable callable to queue for delayed execution.
359     * @param delay the queue delay in milliseconds
360     */
361    protected void queueCallable(XCallable<Void> callable, long delay) {
362        this.delayedCallables.add(callable);
363        this.delay = Math.max(this.delay, delay);
364    }
365
366    /**
367     * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If
368     * an exception does not happen, all the callables queued by this method are discarded, they are not queued for
369     * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single
370     * serial execution.
371     *
372     * @param callable callable to queue for execution in the case of an exception.
373     */
374    protected void queueCallableForException(XCallable<Void> callable) {
375        exceptionCallables.add(callable);
376    }
377
378    /**
379     * Logging the info if failed to queue the callables.
380     *
381     * @param callables
382     */
383    protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) {
384        StringBuilder sb = new StringBuilder(
385                "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:[");
386        int size = callables.size();
387        for (int i = 0; i < size; i++) {
388            XCallable<Void> callable = callables.get(i);
389            sb.append(callable.getName());
390            if (i < size - 1) {
391                sb.append(", ");
392            }
393            else {
394                sb.append("]");
395            }
396        }
397        XLog.getLog(getClass()).warn(sb.toString());
398    }
399
400    /**
401     * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in
402     * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction
403     * is rolledback.
404     *
405     * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a
406     * store.
407     * @return the return value of the callable.
408     * @throws StoreException thrown if the workflow store could not perform an operation.
409     * @throws CommandException thrown if the command could not perform its operation.
410     */
411    protected abstract T call(S store) throws StoreException, CommandException;
412
413    // to do
414    // need to implement on all sub commands and break down the transactions
415
416    // protected abstract T execute(String id) throws CommandException;
417
418    /**
419     * Command subclasses must implement this method correct Store can be passed to call(store);
420     *
421     * @return the Store class for use by Callable
422     * @throws CommandException thrown if the command could not perform its operation.
423     */
424    protected abstract Class<? extends Store> getStoreClass();
425
426    /**
427     * Set the log info with the context of the given coordinator bean.
428     *
429     * @param cBean coordinator bean.
430     */
431    protected void setLogInfo(CoordinatorJobBean cBean) {
432        if (logInfo.getParameter(XLogService.GROUP) == null) {
433            logInfo.setParameter(XLogService.GROUP, cBean.getGroup());
434        }
435        if (logInfo.getParameter(XLogService.USER) == null) {
436            logInfo.setParameter(XLogService.USER, cBean.getUser());
437        }
438        logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId());
439        logInfo.setParameter(DagXLogInfoService.TOKEN, "");
440        logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName());
441        XLog.Info.get().setParameters(logInfo);
442    }
443
444    /**
445     * Set the log info with the context of the given coordinator action bean.
446     *
447     * @param action action bean.
448     */
449    protected void setLogInfo(CoordinatorActionBean action) {
450        logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
451        // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
452        logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
453        XLog.Info.get().setParameters(logInfo);
454    }
455
456    /**
457     * Set the log info with the context of the given workflow bean.
458     *
459     * @param workflow workflow bean.
460     */
461    protected void setLogInfo(WorkflowJobBean workflow) {
462        if (logInfo.getParameter(XLogService.GROUP) == null) {
463            logInfo.setParameter(XLogService.GROUP, workflow.getGroup());
464        }
465        if (logInfo.getParameter(XLogService.USER) == null) {
466            logInfo.setParameter(XLogService.USER, workflow.getUser());
467        }
468        logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId());
469        logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken());
470        logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName());
471        XLog.Info.get().setParameters(logInfo);
472    }
473
474    /**
475     * Set the log info with the context of the given action bean.
476     *
477     * @param action action bean.
478     */
479    protected void setLogInfo(WorkflowActionBean action) {
480        logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
481        logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
482        logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
483        XLog.Info.get().setParameters(logInfo);
484    }
485
486    /**
487     * Reset the action bean information from the log info.
488     */
489    // TODO check if they are used, else delete
490    protected void resetLogInfoAction() {
491        logInfo.clearParameter(DagXLogInfoService.ACTION);
492        XLog.Info.get().clearParameter(DagXLogInfoService.ACTION);
493    }
494
495    /**
496     * Reset the workflow bean information from the log info.
497     */
498    // TODO check if they are used, else delete
499    protected void resetLogInfoWorkflow() {
500        logInfo.clearParameter(DagXLogInfoService.JOB);
501        logInfo.clearParameter(DagXLogInfoService.APP);
502        logInfo.clearParameter(DagXLogInfoService.TOKEN);
503        XLog.Info.get().clearParameter(DagXLogInfoService.JOB);
504        XLog.Info.get().clearParameter(DagXLogInfoService.APP);
505        XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN);
506    }
507
508    /**
509     * Convenience method to increment counters.
510     *
511     * @param group the group name.
512     * @param name the counter name.
513     * @param count increment count.
514     */
515    private void incrCounter(String group, String name, int count) {
516        if (instrumentation != null) {
517            instrumentation.incr(group, name, count);
518        }
519    }
520
521    /**
522     * Used to increment command counters.
523     *
524     * @param count the increment count.
525     */
526    protected void incrCommandCounter(int count) {
527        incrCounter(INSTRUMENTATION_GROUP, name, count);
528    }
529
530    /**
531     * Used to increment job counters. The counter name s the same as the command name.
532     *
533     * @param count the increment count.
534     */
535    protected void incrJobCounter(int count) {
536        incrJobCounter(name, count);
537    }
538
539    /**
540     * Used to increment job counters.
541     *
542     * @param name the job name.
543     * @param count the increment count.
544     */
545    protected void incrJobCounter(String name, int count) {
546        incrCounter(INSTRUMENTATION_JOB_GROUP, name, count);
547    }
548
549    /**
550     * Return the {@link Instrumentation} instance in use.
551     *
552     * @return the {@link Instrumentation} instance in use.
553     */
554    protected Instrumentation getInstrumentation() {
555        return instrumentation;
556    }
557
558    /**
559     * Return the identity.
560     *
561     * @return the identity.
562     */
563    @Override
564    public String toString() {
565        StringBuilder sb = new StringBuilder();
566        sb.append(getType());
567        sb.append(",").append(getPriority());
568        return sb.toString();
569    }
570
571    protected boolean lock(String id) throws InterruptedException {
572        if (id == null || id.length() == 0) {
573            XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":");
574            return false;
575        }
576        LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT);
577        if (token != null) {
578            locks.add(token);
579            return true;
580        }
581        else {
582            return false;
583        }
584    }
585
586    /*
587     * TODO - remove store coupling to EM. Store will only contain queries
588     * protected EntityManager getEntityManager() { return
589     * store.getEntityManager(); }
590     */
591    protected T execute(S store) throws CommandException, StoreException {
592        T result = call(store);
593        return result;
594    }
595
596    /**
597     * Get command key
598     *
599     * @return command key
600     */
601    @Override
602    public String getKey(){
603        return this.key;
604    }
605
606    /**
607     * Get command lock key returning the key as an entity key, [not used] Just
608     * to be able to implement XCallable [to be deprecated]
609     *
610     * @return key
611     */
612    @Override
613    public String getEntityKey() {
614        return this.key;
615    }
616
617    /**
618     * set the mode of execution for the callable. True if in interrupt, false
619     * if not [to be deprecated]
620     */
621    public void setInterruptMode(boolean mode) {
622    }
623
624    /**
625     * [to be deprecated]
626     *
627     * @return the mode of execution. true if it is executed as an Interrupt,
628     *         false otherwise
629     */
630    public boolean inInterruptMode() {
631        return false;
632    }
633
634}