001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.Date;
025import java.util.Properties;
026import java.util.Set;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.oozie.DagELFunctions;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.WorkflowActionBean;
034import org.apache.oozie.WorkflowJobBean;
035import org.apache.oozie.action.ActionExecutor;
036import org.apache.oozie.client.WorkflowAction;
037import org.apache.oozie.client.WorkflowJob;
038import org.apache.oozie.command.CommandException;
039import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
040import org.apache.oozie.service.CallbackService;
041import org.apache.oozie.service.ELService;
042import org.apache.oozie.service.HadoopAccessorException;
043import org.apache.oozie.service.HadoopAccessorService;
044import org.apache.oozie.service.JPAService;
045import org.apache.oozie.service.LiteWorkflowStoreService;
046import org.apache.oozie.service.Services;
047import org.apache.oozie.util.ELEvaluator;
048import org.apache.oozie.util.InstrumentUtils;
049import org.apache.oozie.util.Instrumentation;
050import org.apache.oozie.util.XConfiguration;
051import org.apache.oozie.workflow.WorkflowException;
052import org.apache.oozie.workflow.WorkflowInstance;
053import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
054
055/**
056 * Base class for Action execution commands. Provides common functionality to handle different types of errors while
057 * attempting to start or end an action.
058 */
059public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
060    private static final String INSTRUMENTATION_GROUP = "action.executors";
061
062    protected static final String INSTR_FAILED_JOBS_COUNTER = "failed";
063
064    protected static final String RECOVERY_ID_SEPARATOR = "@";
065
066    public ActionXCommand(String name, String type, int priority) {
067        super(name, type, priority);
068    }
069
070    /**
071     * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
072     * attempts have been made. Otherwise returns false.
073     *
074     * @param context the execution context.
075     * @param executor the executor instance being used.
076     * @param status the status to be set for the action.
077     * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
078     *         maximum number of configured retries.
079     * @throws CommandException thrown if unable to handle transient
080     */
081    protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
082            WorkflowAction.Status status) throws CommandException {
083        LOG.debug("Attempting to retry");
084        ActionExecutorContext aContext = (ActionExecutorContext) context;
085        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
086        incrActionErrorCounter(action.getType(), "transient", 1);
087
088        int actionRetryCount = action.getRetries();
089        if (actionRetryCount >= executor.getMaxRetries()) {
090            LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
091            return false;
092        }
093        else {
094            action.setStatus(status);
095            action.setPending();
096            action.incRetries();
097            long retryDelayMillis = executor.getRetryInterval() * 1000;
098            action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
099            LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
100            this.resetUsed();
101            queue(this, retryDelayMillis);
102            return true;
103        }
104    }
105
106    /**
107     * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
108     * set pending flag of action to false
109     *
110     * @param context the execution context.
111     * @param executor the executor instance being used.
112     * @param status the status to be set for the action.
113     * @throws CommandException thrown if unable to suspend job
114     */
115    protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
116            WorkflowAction.Status status) throws CommandException {
117        ActionExecutorContext aContext = (ActionExecutorContext) context;
118        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
119        incrActionErrorCounter(action.getType(), "nontransient", 1);
120        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
121        String id = workflow.getId();
122        action.setStatus(status);
123        action.resetPendingOnly();
124        LOG.warn("Suspending Workflow Job id=" + id);
125        try {
126            SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null);
127        }
128        catch (Exception e) {
129            throw new CommandException(ErrorCode.E0727, id, e.getMessage());
130        }
131        finally {
132            // update coordinator action
133            new CoordActionUpdateXCommand(workflow, 3).call();
134        }
135    }
136
137    /**
138     * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
139     * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
140     * </p>
141     *
142     * @param context the execution context.
143     * @param executor the executor instance being used.
144     * @param message
145     * @param isStart whether the error was generated while starting or ending an action.
146     * @param status the status to be set for the action.
147     * @throws CommandException thrown if unable to handle action error
148     */
149    protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
150            boolean isStart, WorkflowAction.Status status) throws CommandException {
151        LOG.warn("Setting Action Status to [{0}]", status);
152        ActionExecutorContext aContext = (ActionExecutorContext) context;
153        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
154
155        if (!handleUserRetry(action)) {
156            incrActionErrorCounter(action.getType(), "error", 1);
157            action.setPending();
158            if (isStart) {
159                action.setExecutionData(message, null);
160                queue(new ActionEndXCommand(action.getId(), action.getType()));
161            }
162            else {
163                action.setEndData(status, WorkflowAction.Status.ERROR.toString());
164            }
165        }
166    }
167
168    /**
169     * Fail the job due to failed action
170     *
171     * @param context the execution context.
172     * @throws CommandException thrown if unable to fail job
173     */
174    public void failJob(ActionExecutor.Context context) throws CommandException {
175        ActionExecutorContext aContext = (ActionExecutorContext) context;
176        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
177        failJob(context, action);
178    }
179
180    /**
181     * Fail the job due to failed action
182     *
183     * @param context the execution context.
184     * @param action the action that caused the workflow to fail
185     * @throws CommandException thrown if unable to fail job
186     */
187    public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
188        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
189        if (!handleUserRetry(action)) {
190            incrActionErrorCounter(action.getType(), "failed", 1);
191            LOG.warn("Failing Job due to failed action [{0}]", action.getName());
192            try {
193                workflow.getWorkflowInstance().fail(action.getName());
194                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
195                ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
196                workflow.setWorkflowInstance(wfInstance);
197                workflow.setStatus(WorkflowJob.Status.FAILED);
198                action.setStatus(WorkflowAction.Status.FAILED);
199                action.resetPending();
200                queue(new NotificationXCommand(workflow, action));
201                queue(new KillXCommand(workflow.getId()));
202                InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
203            }
204            catch (WorkflowException ex) {
205                throw new CommandException(ex);
206            }
207        }
208    }
209
210    /**
211     * Execute retry for action if this action is eligible for user-retry
212     *
213     * @param context the execution context.
214     * @return true if user-retry has to be handled for this action
215     * @throws CommandException thrown if unable to fail job
216     */
217    public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
218        String errorCode = action.getErrorCode();
219        Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
220
221        if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) {
222            LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
223                    + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
224                    .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
225            int interval = action.getUserRetryInterval() * 60 * 1000;
226            action.setStatus(WorkflowAction.Status.USER_RETRY);
227            action.incrmentUserRetryCount();
228            action.setPending();
229            queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
230            return true;
231        }
232        return false;
233    }
234
235        /*
236         * In case of action error increment the error count for instrumentation
237         */
238    private void incrActionErrorCounter(String type, String error, int count) {
239        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
240    }
241
242        /**
243         * Increment the action counter in the instrumentation log. indicating how
244         * many times the action was executed since the start Oozie server
245         */
246    protected void incrActionCounter(String type, int count) {
247        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
248    }
249
250        /**
251         * Adding a cron for the instrumentation time for the given Instrumentation
252         * group
253         */
254    protected void addActionCron(String type, Instrumentation.Cron cron) {
255        getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
256    }
257
258    /**
259     * Workflow action executor context
260     *
261     */
262    public static class ActionExecutorContext implements ActionExecutor.Context {
263        private final WorkflowJobBean workflow;
264        private Configuration protoConf;
265        private final WorkflowActionBean action;
266        private final boolean isRetry;
267        private final boolean isUserRetry;
268        private boolean started;
269        private boolean ended;
270        private boolean executed;
271
272                /**
273                 * Constructing the ActionExecutorContext, setting the private members
274                 * and constructing the proto configuration
275                 */
276        public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
277            this.workflow = workflow;
278            this.action = action;
279            this.isRetry = isRetry;
280            this.isUserRetry = isUserRetry;
281            try {
282                protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
283            }
284            catch (IOException ex) {
285                throw new RuntimeException("It should not happen", ex);
286            }
287        }
288
289        /*
290         * (non-Javadoc)
291         * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String)
292         */
293        public String getCallbackUrl(String externalStatusVar) {
294            return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
295        }
296
297        /*
298         * (non-Javadoc)
299         * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf()
300         */
301        public Configuration getProtoActionConf() {
302            return protoConf;
303        }
304
305        /*
306         * (non-Javadoc)
307         * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow()
308         */
309        public WorkflowJob getWorkflow() {
310            return workflow;
311        }
312
313        /**
314         * Returns the workflow action of the given action context
315         *
316         * @return the workflow action of the given action context
317         */
318        public WorkflowAction getAction() {
319            return action;
320        }
321
322        /*
323         * (non-Javadoc)
324         * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator()
325         */
326        public ELEvaluator getELEvaluator() {
327            ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
328            DagELFunctions.configureEvaluator(evaluator, workflow, action);
329            return evaluator;
330        }
331
332        /*
333         * (non-Javadoc)
334         * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
335         */
336        public void setVar(String name, String value) {
337            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
338            WorkflowInstance wfInstance = workflow.getWorkflowInstance();
339            wfInstance.setVar(name, value);
340            workflow.setWorkflowInstance(wfInstance);
341        }
342
343        /*
344         * (non-Javadoc)
345         * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String)
346         */
347        public String getVar(String name) {
348            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
349            return workflow.getWorkflowInstance().getVar(name);
350        }
351
352        /*
353         * (non-Javadoc)
354         * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String)
355         */
356        public void setStartData(String externalId, String trackerUri, String consoleUrl) {
357            action.setStartData(externalId, trackerUri, consoleUrl);
358            started = true;
359        }
360
361        /**
362         * Setting the start time of the action
363         */
364        public void setStartTime() {
365            Date now = new Date();
366            action.setStartTime(now);
367        }
368
369        /*
370         * (non-Javadoc)
371         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties)
372         */
373        public void setExecutionData(String externalStatus, Properties actionData) {
374            action.setExecutionData(externalStatus, actionData);
375            executed = true;
376        }
377
378        /*
379         * (non-Javadoc)
380         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String)
381         */
382        public void setExecutionStats(String jsonStats) {
383            action.setExecutionStats(jsonStats);
384            executed = true;
385        }
386
387        /*
388         * (non-Javadoc)
389         * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String)
390         */
391        public void setExternalChildIDs(String externalChildIDs) {
392            action.setExternalChildIDs(externalChildIDs);
393            executed = true;
394        }
395
396        /*
397         * (non-Javadoc)
398         * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String)
399         */
400        public void setEndData(WorkflowAction.Status status, String signalValue) {
401            action.setEndData(status, signalValue);
402            ended = true;
403        }
404
405        /*
406         * (non-Javadoc)
407         * @see org.apache.oozie.action.ActionExecutor.Context#isRetry()
408         */
409        public boolean isRetry() {
410            return isRetry;
411        }
412
413        /**
414         * Return if the executor invocation is a user retry or not.
415         *
416         * @return if the executor invocation is a user retry or not.
417         */
418        public boolean isUserRetry() {
419            return isUserRetry;
420        }
421
422        /**
423         * Returns whether setStartData has been called or not.
424         *
425         * @return true if start completion info has been set.
426         */
427        public boolean isStarted() {
428            return started;
429        }
430
431        /**
432         * Returns whether setExecutionData has been called or not.
433         *
434         * @return true if execution completion info has been set, otherwise false.
435         */
436        public boolean isExecuted() {
437            return executed;
438        }
439
440        /**
441         * Returns whether setEndData has been called or not.
442         *
443         * @return true if end completion info has been set.
444         */
445        public boolean isEnded() {
446            return ended;
447        }
448
449        public void setExternalStatus(String externalStatus) {
450            action.setExternalStatus(externalStatus);
451        }
452
453        @Override
454        public String getRecoveryId() {
455            return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
456        }
457
458        /* (non-Javadoc)
459         * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
460         */
461        public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
462            String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
463            FileSystem fs = getAppFileSystem();
464            String actionDirPath = Services.get().getSystemId() + "/" + name;
465            Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
466            return fqActionDir;
467        }
468
469        /* (non-Javadoc)
470         * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
471         */
472        public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
473            WorkflowJob workflow = getWorkflow();
474            URI uri = new URI(getWorkflow().getAppPath());
475            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
476            Configuration fsConf = has.createJobConf(uri.getAuthority());
477            return has.createFileSystem(workflow.getUser(), uri, fsConf);
478
479        }
480
481        /* (non-Javadoc)
482         * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
483         */
484        @Override
485        public void setErrorInfo(String str, String exMsg) {
486            action.setErrorInfo(str, exMsg);
487        }
488    }
489
490}