001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.oozie.DagELFunctions;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.SLAEventBean;
028import org.apache.oozie.WorkflowActionBean;
029import org.apache.oozie.WorkflowJobBean;
030import org.apache.oozie.XException;
031import org.apache.oozie.action.ActionExecutor;
032import org.apache.oozie.action.ActionExecutorException;
033import org.apache.oozie.action.control.ControlNodeActionExecutor;
034import org.apache.oozie.client.OozieClient;
035import org.apache.oozie.client.WorkflowAction;
036import org.apache.oozie.client.WorkflowJob;
037import org.apache.oozie.client.SLAEvent.SlaAppType;
038import org.apache.oozie.client.SLAEvent.Status;
039import org.apache.oozie.client.rest.JsonBean;
040import org.apache.oozie.command.CommandException;
041import org.apache.oozie.command.PreconditionException;
042import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
043import org.apache.oozie.executor.jpa.JPAExecutorException;
044import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
045import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
046import org.apache.oozie.service.ActionService;
047import org.apache.oozie.service.EventHandlerService;
048import org.apache.oozie.service.JPAService;
049import org.apache.oozie.service.Services;
050import org.apache.oozie.service.UUIDService;
051import org.apache.oozie.util.Instrumentation;
052import org.apache.oozie.util.LogUtils;
053import org.apache.oozie.util.XLog;
054import org.apache.oozie.util.db.SLADbXOperations;
055import org.apache.oozie.workflow.WorkflowInstance;
056
057@SuppressWarnings("deprecation")
058public class ActionEndXCommand extends ActionXCommand<Void> {
059    public static final String COULD_NOT_END = "COULD_NOT_END";
060    public static final String END_DATA_MISSING = "END_DATA_MISSING";
061
062    private String jobId = null;
063    private String actionId = null;
064    private WorkflowJobBean wfJob = null;
065    private WorkflowActionBean wfAction = null;
066    private JPAService jpaService = null;
067    private ActionExecutor executor = null;
068    private List<JsonBean> updateList = new ArrayList<JsonBean>();
069    private List<JsonBean> insertList = new ArrayList<JsonBean>();
070
071    public ActionEndXCommand(String actionId, String type) {
072        super("action.end", type, 0);
073        this.actionId = actionId;
074        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
075    }
076
077    @Override
078    protected boolean isLockRequired() {
079        return true;
080    }
081
082    @Override
083    public String getEntityKey() {
084        return this.jobId;
085    }
086
087    @Override
088    public String getKey() {
089        return getName() + "_" + actionId;
090    }
091
092    @Override
093    protected void loadState() throws CommandException {
094        try {
095            jpaService = Services.get().get(JPAService.class);
096            if (jpaService != null) {
097                this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
098                this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
099                LogUtils.setLogInfo(wfJob, logInfo);
100                LogUtils.setLogInfo(wfAction, logInfo);
101            }
102            else {
103                throw new CommandException(ErrorCode.E0610);
104            }
105        }
106        catch (XException ex) {
107            throw new CommandException(ex);
108        }
109    }
110
111    @Override
112    protected void verifyPrecondition() throws CommandException, PreconditionException {
113        if (wfJob == null) {
114            throw new PreconditionException(ErrorCode.E0604, jobId);
115        }
116        if (wfAction == null) {
117            throw new PreconditionException(ErrorCode.E0605, actionId);
118        }
119        if (wfAction.isPending()
120                && (wfAction.getStatus() == WorkflowActionBean.Status.DONE
121                        || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) {
122
123            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
124                throw new PreconditionException(ErrorCode.E0811,  WorkflowJob.Status.RUNNING.toString());
125            }
126        }
127        else {
128            throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr());
129        }
130
131        executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
132        if (executor == null) {
133            throw new CommandException(ErrorCode.E0802, wfAction.getType());
134        }
135    }
136
137    @Override
138    protected Void execute() throws CommandException {
139        LOG.debug("STARTED ActionEndXCommand for action " + actionId);
140
141        Configuration conf = wfJob.getWorkflowInstance().getConf();
142
143        int maxRetries = 0;
144        long retryInterval = 0;
145
146        if (!(executor instanceof ControlNodeActionExecutor)) {
147            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
148            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
149        }
150
151        executor.setMaxRetries(maxRetries);
152        executor.setRetryInterval(retryInterval);
153
154        boolean isRetry = false;
155        if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
156                || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
157            isRetry = true;
158        }
159        boolean isUserRetry = false;
160        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
161        try {
162
163            LOG.debug(
164                    "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]",
165                    wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(),
166                    wfAction.getSignalValue());
167
168            Instrumentation.Cron cron = new Instrumentation.Cron();
169            cron.start();
170            executor.end(context, wfAction);
171            cron.stop();
172            addActionCron(wfAction.getType(), cron);
173
174            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
175            DagELFunctions.setActionInfo(wfInstance, wfAction);
176            wfJob.setWorkflowInstance(wfInstance);
177            incrActionCounter(wfAction.getType(), 1);
178
179            if (!context.isEnded()) {
180                LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()",
181                        executor.getType());
182                wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
183                failJob(context);
184            } else {
185                wfAction.setRetries(0);
186                wfAction.setEndTime(new Date());
187
188                boolean shouldHandleUserRetry = false;
189                Status slaStatus = null;
190                switch (wfAction.getStatus()) {
191                    case OK:
192                        slaStatus = Status.SUCCEEDED;
193                        break;
194                    case KILLED:
195                        slaStatus = Status.KILLED;
196                        break;
197                    case FAILED:
198                        slaStatus = Status.FAILED;
199                        shouldHandleUserRetry = true;
200                        break;
201                    case ERROR:
202                        LOG.info("ERROR is considered as FAILED for SLA");
203                        slaStatus = Status.KILLED;
204                        shouldHandleUserRetry = true;
205                        break;
206                    default:
207                        slaStatus = Status.FAILED;
208                        shouldHandleUserRetry = true;
209                        break;
210                }
211                if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
212                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
213                    LOG.debug("Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
214                            + ", Set pending=" + wfAction.getPending());
215                    if(slaEvent != null) {
216                        insertList.add(slaEvent);
217                    }
218                    queue(new SignalXCommand(jobId, actionId));
219                }
220            }
221            updateList.add(wfAction);
222            wfJob.setLastModifiedTime(new Date());
223            updateList.add(wfJob);
224        }
225        catch (ActionExecutorException ex) {
226            LOG.warn(
227                    "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
228                    wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
229            wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
230            wfAction.setEndTime(null);
231
232            switch (ex.getErrorType()) {
233                case TRANSIENT:
234                    if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
235                        handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
236                        wfAction.setPendingAge(new Date());
237                        wfAction.setRetries(0);
238                    }
239                    wfAction.setEndTime(null);
240                    break;
241                case NON_TRANSIENT:
242                    handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
243                    wfAction.setEndTime(null);
244                    break;
245                case ERROR:
246                    handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR);
247                    queue(new SignalXCommand(jobId, actionId));
248                    break;
249                case FAILED:
250                    failJob(context);
251                    break;
252            }
253
254            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
255            DagELFunctions.setActionInfo(wfInstance, wfAction);
256            wfJob.setWorkflowInstance(wfInstance);
257
258            updateList.add(wfAction);
259            wfJob.setLastModifiedTime(new Date());
260            updateList.add(wfJob);
261        }
262        finally {
263            try {
264                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
265                if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
266                    generateEvent(wfAction, wfJob.getUser());
267                }
268            }
269            catch (JPAExecutorException e) {
270                throw new CommandException(e);
271            }
272        }
273
274        LOG.debug("ENDED ActionEndXCommand for action " + actionId);
275        return null;
276    }
277
278}