001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command;
019
020import org.apache.oozie.ErrorCode;
021import org.apache.oozie.FaultInjection;
022import org.apache.oozie.XException;
023import org.apache.oozie.service.CallableQueueService;
024import org.apache.oozie.service.EventHandlerService;
025import org.apache.oozie.service.InstrumentationService;
026import org.apache.oozie.service.MemoryLocksService;
027import org.apache.oozie.service.Services;
028import org.apache.oozie.util.Instrumentation;
029import org.apache.oozie.util.MemoryLocks;
030import org.apache.oozie.util.XCallable;
031import org.apache.oozie.util.XLog;
032
033import java.util.ArrayList;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.UUID;
039import java.util.concurrent.atomic.AtomicBoolean;
040
041/**
042 * Base class for synchronous and asynchronous commands.
043 * <p/>
044 * It enables by API the following pattern:
045 * <p/>
046 * <ul>
047 * <li>single execution: a command instance can be executed only once</li>
048 * <li>eager data loading: loads data for eager precondition check</li>
049 * <li>eager precondition check: verify precondition before obtaining lock</li>
050 * <li>data loading: loads data for precondition check and execution</li>
051 * <li>precondition check: verifies precondition for execution is still met</li>
052 * <li>locking: obtains exclusive lock on key before executing the command</li>
053 * <li>execution: command logic</li>
054 * </ul>
055 * <p/>
056 * It has built in instrumentation and logging.
057 */
058public abstract class XCommand<T> implements XCallable<T> {
059    public static final String DEFAULT_LOCK_TIMEOUT = "oozie.command.default.lock.timeout";
060
061    public static final String INSTRUMENTATION_GROUP = "commands";
062
063    public static final Long DEFAULT_REQUEUE_DELAY = 10L;
064
065    public XLog LOG = XLog.getLog(getClass());
066
067    private String key;
068    private String name;
069    private int priority;
070    private String type;
071    private long createdTime;
072    private MemoryLocks.LockToken lock;
073    private AtomicBoolean used = new AtomicBoolean(false);
074    private boolean inInterrupt = false;
075
076    private Map<Long, List<XCommand<?>>> commandQueue;
077    protected boolean dryrun = false;
078    protected Instrumentation instrumentation;
079
080    protected XLog.Info logInfo;
081    protected static EventHandlerService eventService;
082
083    /**
084     * Create a command.
085     *
086     * @param name command name.
087     * @param type command type.
088     * @param priority command priority.
089     */
090    public XCommand(String name, String type, int priority) {
091        this.name = name;
092        this.type = type;
093        this.priority = priority;
094        this.key = name + "_" + UUID.randomUUID();
095        createdTime = System.currentTimeMillis();
096        logInfo = new XLog.Info();
097        instrumentation = Services.get().get(InstrumentationService.class).get();
098        eventService = Services.get().get(EventHandlerService.class);
099    }
100
101    /**
102     * @param name command name.
103     * @param type command type.
104     * @param priority command priority.
105     * @param dryrun indicates if dryrun option is enabled. if enabled bundle will show a diagnostic output without
106     *        really running the job
107     */
108    public XCommand(String name, String type, int priority, boolean dryrun) {
109        this(name, type, priority);
110        this.dryrun = dryrun;
111    }
112
113    /**
114     * Return the command name.
115     *
116     * @return the command name.
117     */
118    @Override
119    public String getName() {
120        return name;
121    }
122
123    /**
124     * Return the callable type.
125     * <p/>
126     * The command type is used for concurrency throttling in the {@link CallableQueueService}.
127     *
128     * @return the command type.
129     */
130    @Override
131    public String getType() {
132        return type;
133    }
134
135    /**
136     * Return the priority of the command.
137     *
138     * @return the command priority.
139     */
140    @Override
141    public int getPriority() {
142        return priority;
143    }
144
145    /**
146     * Returns the creation time of the command.
147     *
148     * @return the command creation time, in milliseconds.
149     */
150    @Override
151    public long getCreatedTime() {
152        return createdTime;
153    }
154
155    /**
156     * Queue a command for execution after the current command execution completes.
157     * <p/>
158     * All commands queued during the execution of the current command will be queued for a single serial execution.
159     * <p/>
160     * If the command execution throws an exception, no command will be effectively queued.
161     *
162     * @param command command to queue.
163     */
164    protected void queue(XCommand<?> command) {
165        queue(command, 0);
166    }
167
168    /**
169     * Queue a command for delayed execution after the current command execution completes.
170     * <p/>
171     * All commands queued during the execution of the current command with the same delay will be queued for a single
172     * serial execution.
173     * <p/>
174     * If the command execution throws an exception, no command will be effectively queued.
175     *
176     * @param command command to queue.
177     * @param msDelay delay in milliseconds.
178     */
179    protected void queue(XCommand<?> command, long msDelay) {
180        if (commandQueue == null) {
181            commandQueue = new HashMap<Long, List<XCommand<?>>>();
182        }
183        List<XCommand<?>> list = commandQueue.get(msDelay);
184        if (list == null) {
185            list = new ArrayList<XCommand<?>>();
186            commandQueue.put(msDelay, list);
187        }
188        list.add(command);
189    }
190
191    /**
192     * Obtain an exclusive lock on the {link #getEntityKey}.
193     * <p/>
194     * A timeout of {link #getLockTimeOut} is used when trying to obtain the lock.
195     *
196     * @throws InterruptedException thrown if an interruption happened while trying to obtain the lock
197     * @throws CommandException thrown i the lock could not be obtained.
198     */
199    private void acquireLock() throws InterruptedException, CommandException {
200        if (getEntityKey() == null) {
201            // no lock for null entity key
202            return;
203        }
204        lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut());
205        if (lock == null) {
206            Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
207            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1);
208            if (isReQueueRequired()) {
209                //if not acquire the lock, re-queue itself with default delay
210                queue(this, getRequeueDelay());
211                LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
212            } else {
213                throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
214            }
215        } else {
216            LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName());
217        }
218    }
219
220    /**
221     * Release the lock on the {link #getEntityKey}.
222     */
223    private void releaseLock() {
224        if (lock != null) {
225            lock.release();
226            LOG.debug("Released lock for [{0}] in [{1}]", getEntityKey(), getName());
227        }
228    }
229
230    /**
231     * Implements the XCommand life-cycle.
232     *
233     * @return the {link #execute} return value.
234     * @throws Exception thrown if the command could not be executed.
235     */
236    @Override
237    public final T call() throws CommandException {
238        if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
239            LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", getName(), getEntityKey(), this.toString());
240            return null;
241        }
242
243        commandQueue = null;
244        Instrumentation instrumentation = Services.get().get(InstrumentationService.class).get();
245        instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
246        Instrumentation.Cron callCron = new Instrumentation.Cron();
247        try {
248            callCron.start();
249            eagerLoadState();
250            LOG = XLog.resetPrefix(LOG);
251            eagerVerifyPrecondition();
252            try {
253                T ret = null;
254                if (isLockRequired() && !this.inInterruptMode()) {
255                    Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
256                    acquireLockCron.start();
257                    acquireLock();
258                    acquireLockCron.stop();
259                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
260                }
261                // executing interrupts only in case of the lock required commands
262                if (lock != null) {
263                    this.executeInterrupts();
264                }
265
266                if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
267                    if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
268                            && !used.compareAndSet(false, true)) {
269                        LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(), this.toString());
270                        return null;
271                    }
272                    LOG.trace("Load state for [{0}]", getEntityKey());
273                    loadState();
274                    LOG = XLog.resetPrefix(LOG);
275                    LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
276                    verifyPrecondition();
277                    LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
278                    Instrumentation.Cron executeCron = new Instrumentation.Cron();
279                    executeCron.start();
280                    ret = execute();
281                    executeCron.stop();
282                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
283                }
284                if (commandQueue != null) {
285                    CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
286                    for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
287                        LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
288                        if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
289                            LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
290                                    .size(), entry.getKey());
291                        }
292                    }
293                }
294                return ret;
295            }
296            finally {
297                if (isLockRequired() && !this.inInterruptMode()) {
298                    releaseLock();
299                }
300            }
301        }
302        catch(PreconditionException pex){
303            LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
304            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
305            return null;
306        }
307        catch (XException ex) {
308            LOG.error("XException, ", ex);
309            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
310            if (ex instanceof CommandException) {
311                throw (CommandException) ex;
312            }
313            else {
314                throw new CommandException(ex);
315            }
316        }
317        catch (Exception ex) {
318            LOG.error("Exception, ", ex);
319            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
320            throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
321        }
322        catch (Error er) {
323            LOG.error("Error, ", er);
324            throw er;
325        }
326        finally {
327            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
328            callCron.stop();
329            instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
330        }
331    }
332
333    /**
334     * Check for the existence of interrupts for the same lock key
335     * Execute them if exist.
336     *
337     */
338    protected void executeInterrupts() {
339        CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
340        // getting all the list of interrupts to be executed
341        Set<XCallable<?>> callables = callableQueueService.checkInterrupts(this.getEntityKey());
342
343        if (callables != null) {
344            // executing the list of interrupts in the given order of insertion
345            // in the list
346            for (XCallable<?> callable : callables) {
347                LOG.trace("executing interrupt callable [{0}]", callable.getName());
348                try {
349                    // executing the callable in interrupt mode
350                    callable.setInterruptMode(true);
351                    callable.call();
352                    LOG.trace("executed interrupt callable [{0}]", callable.getName());
353                }
354                catch (Exception ex) {
355                    LOG.warn("exception interrupt callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
356                }
357                finally {
358                    // reseting the interrupt mode to false after the command is
359                    // executed
360                    callable.setInterruptMode(false);
361                }
362            }
363        }
364    }
365
366    /**
367     * Return the time out when acquiring a lock.
368     * <p/>
369     * The value is loaded from the Oozie configuration, the property {link #DEFAULT_LOCK_TIMEOUT}.
370     * <p/>
371     * Subclasses should override this method if they want to use a different time out.
372     *
373     * @return the lock time out in milliseconds.
374     */
375    protected long getLockTimeOut() {
376        return Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);
377    }
378
379    /**
380     * Indicate if the the command requires locking.
381     * <p/>
382     * Subclasses should override this method if they require locking.
383     *
384     * @return <code>true/false</code>
385     */
386    protected abstract boolean isLockRequired();
387
388    /**
389     * Return the entity key for the command.
390     * <p/>
391     *
392     * @return the entity key for the command.
393     */
394    public abstract String getEntityKey();
395
396    /**
397     * Indicate if the the command requires to requeue itself if the lock is not acquired.
398     * <p/>
399     * Subclasses should override this method if they don't want to requeue.
400     * <p/>
401     * Default is true.
402     *
403     * @return <code>true/false</code>
404     */
405    protected boolean isReQueueRequired() {
406        return true;
407    }
408
409    /**
410     * Load the necessary state to perform an eager precondition check.
411     * <p/>
412     * This implementation does a NOP.
413     * <p/>
414     * Subclasses should override this method and load the state needed to do an eager precondition check.
415     * <p/>
416     * A trivial implementation is calling {link #loadState}.
417     */
418    protected void eagerLoadState() throws CommandException{
419    }
420
421    /**
422     * Verify the precondition for the command before obtaining a lock.
423     * <p/>
424     * This implementation does a NOP.
425     * <p/>
426     * A trivial implementation is calling {link #verifyPrecondition}.
427     *
428     * @throws CommandException thrown if the precondition is not met.
429     */
430    protected void eagerVerifyPrecondition() throws CommandException,PreconditionException {
431    }
432
433    /**
434     * Load the necessary state to perform the precondition check and to execute the command.
435     * <p/>
436     * Subclasses must implement this method and load the state needed to do the precondition check and execute the
437     * command.
438     */
439    protected abstract void loadState() throws CommandException;
440
441    /**
442     * Verify the precondition for the command after a lock has been obtain, just before executing the command.
443     * <p/>
444     *
445     * @throws CommandException thrown if the precondition is not met.
446     */
447    protected abstract void verifyPrecondition() throws CommandException,PreconditionException;
448
449    /**
450     * Command execution body.
451     * <p/>
452     * This method will be invoked after the {link #loadState} and {link #verifyPrecondition} methods.
453     * <p/>
454     * If the command requires locking, this method will be invoked ONLY if the lock has been acquired.
455     *
456     * @return a return value from the execution of the command, only meaningful if the command is executed
457     *         synchronously.
458     * @throws CommandException thrown if the command execution failed.
459     */
460    protected abstract T execute() throws CommandException;
461
462
463    /**
464     * Return the {@link Instrumentation} instance in use.
465     *
466     * @return the {@link Instrumentation} instance in use.
467     */
468    protected Instrumentation getInstrumentation() {
469        return instrumentation;
470    }
471
472    /**
473     * @param used set false to the used
474     */
475    public void resetUsed() {
476        this.used.set(false);
477    }
478
479
480    /**
481     * Return the delay time for requeue
482     *
483     * @return delay time when requeue itself
484     */
485    protected Long getRequeueDelay() {
486        return DEFAULT_REQUEUE_DELAY;
487    }
488
489    /**
490     * Get command key
491     *
492     * @return command key
493     */
494    @Override
495    public String getKey(){
496        return this.key;
497    }
498
499    /**
500     * set the mode of execution for the callable. True if in interrupt, false
501     * if not
502     */
503    public void setInterruptMode(boolean mode) {
504        this.inInterrupt = mode;
505    }
506
507    /**
508     * @return the mode of execution. true if it is executed as an Interrupt,
509     *         false otherwise
510     */
511    public boolean inInterruptMode() {
512        return this.inInterrupt;
513    }
514
515    /**
516     * Get XLog log
517     *
518     * @return XLog
519     */
520    public XLog getLog() {
521        return LOG;
522    }
523
524}