001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.WorkflowActionBean;
027import org.apache.oozie.WorkflowJobBean;
028import org.apache.oozie.XException;
029import org.apache.oozie.action.ActionExecutor;
030import org.apache.oozie.action.ActionExecutorException;
031import org.apache.oozie.client.WorkflowAction;
032import org.apache.oozie.client.WorkflowJob;
033import org.apache.oozie.client.rest.JsonBean;
034import org.apache.oozie.command.CommandException;
035import org.apache.oozie.command.PreconditionException;
036import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
037import org.apache.oozie.executor.jpa.JPAExecutorException;
038import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
039import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
040import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
041import org.apache.oozie.service.ActionCheckerService;
042import org.apache.oozie.service.ActionService;
043import org.apache.oozie.service.EventHandlerService;
044import org.apache.oozie.service.JPAService;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.service.UUIDService;
047import org.apache.oozie.util.Instrumentation;
048import org.apache.oozie.util.LogUtils;
049import org.apache.oozie.util.XLog;
050
051/**
052 * Executes the check command for ActionHandlers. </p> Ensures the action is in
053 * RUNNING state before executing
054 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
055 */
056public class ActionCheckXCommand extends ActionXCommand<Void> {
057    public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
058    private String actionId;
059    private String jobId;
060    private int actionCheckDelay;
061    private WorkflowJobBean wfJob = null;
062    private WorkflowActionBean wfAction = null;
063    private JPAService jpaService = null;
064    private ActionExecutor executor = null;
065    private List<JsonBean> updateList = new ArrayList<JsonBean>();
066    private boolean generateEvent = false;
067
068    public ActionCheckXCommand(String actionId) {
069        this(actionId, -1);
070    }
071
072    public ActionCheckXCommand(String actionId, int priority, int checkDelay) {
073        super("action.check", "action.check", priority);
074        this.actionId = actionId;
075        this.actionCheckDelay = checkDelay;
076        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
077    }
078
079    public ActionCheckXCommand(String actionId, int checkDelay) {
080        this(actionId, 0, checkDelay);
081    }
082
083    @Override
084    protected void eagerLoadState() throws CommandException {
085        try {
086            jpaService = Services.get().get(JPAService.class);
087            if (jpaService != null) {
088                this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
089                this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
090                LogUtils.setLogInfo(wfJob, logInfo);
091                LogUtils.setLogInfo(wfAction, logInfo);
092            }
093            else {
094                throw new CommandException(ErrorCode.E0610);
095            }
096        }
097        catch (XException ex) {
098            throw new CommandException(ex);
099        }
100    }
101
102    @Override
103    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
104        if (wfJob == null) {
105            throw new PreconditionException(ErrorCode.E0604, jobId);
106        }
107        if (wfAction == null) {
108            throw new PreconditionException(ErrorCode.E0605, actionId);
109        }
110        // if the action has been updated, quit this command
111        if (actionCheckDelay > 0) {
112            Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
113            Timestamp actionLmt = wfAction.getLastCheckTimestamp();
114            if (actionLmt.after(actionCheckTs)) {
115                throw new PreconditionException(ErrorCode.E0817, actionId);
116            }
117        }
118
119        executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
120        if (executor == null) {
121            throw new CommandException(ErrorCode.E0802, wfAction.getType());
122        }
123    }
124
125    @Override
126    protected boolean isLockRequired() {
127        return true;
128    }
129
130    @Override
131    public String getEntityKey() {
132        return this.jobId;
133    }
134
135    @Override
136    protected void loadState() throws CommandException {
137        eagerLoadState();
138    }
139
140    @Override
141    protected void verifyPrecondition() throws CommandException, PreconditionException {
142        if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
143            throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr());
144        }
145        if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
146            wfAction.setLastCheckTime(new Date());
147            try {
148                jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
149            }
150            catch (JPAExecutorException e) {
151                throw new CommandException(e);
152            }
153            throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus());
154        }
155    }
156
157    @Override
158    protected Void execute() throws CommandException {
159        LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority());
160
161        long retryInterval = Services.get().getConf().getLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL, executor
162                .getRetryInterval());
163        executor.setRetryInterval(retryInterval);
164
165        ActionExecutorContext context = null;
166        try {
167            boolean isRetry = false;
168            if (wfAction.getRetries() > 0) {
169                isRetry = true;
170            }
171            boolean isUserRetry = false;
172            context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
173            incrActionCounter(wfAction.getType(), 1);
174
175            Instrumentation.Cron cron = new Instrumentation.Cron();
176            cron.start();
177            executor.check(context, wfAction);
178            cron.stop();
179            addActionCron(wfAction.getType(), cron);
180
181            if (wfAction.isExecutionComplete()) {
182                if (!context.isExecuted()) {
183                    LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
184                            .getType());
185                    wfAction.setErrorInfo(EXEC_DATA_MISSING,
186                            "Execution Complete, but Execution Data Missing from Action");
187                    failJob(context);
188                    generateEvent = true;
189                } else {
190                    wfAction.setPending();
191                    queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
192                }
193            }
194            wfAction.setLastCheckTime(new Date());
195            updateList.add(wfAction);
196            wfJob.setLastModifiedTime(new Date());
197            updateList.add(wfJob);
198        }
199        catch (ActionExecutorException ex) {
200            LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
201                    .getMessage(), ex);
202
203            wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
204            switch (ex.getErrorType()) {
205                case FAILED:
206                    failJob(context, wfAction);
207                    generateEvent = true;
208                    break;
209                case ERROR:
210                    handleUserRetry(wfAction);
211                    break;
212                case TRANSIENT:                 // retry N times, then suspend workflow
213                    if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
214                        handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
215                        generateEvent = true;
216                        wfAction.setPendingAge(new Date());
217                        wfAction.setRetries(0);
218                        wfAction.setStartTime(null);
219                    }
220                    break;
221            }
222            wfAction.setLastCheckTime(new Date());
223            updateList = new ArrayList<JsonBean>();
224            updateList.add(wfAction);
225            wfJob.setLastModifiedTime(new Date());
226            updateList.add(wfJob);
227        }
228        finally {
229            try {
230                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
231                if (generateEvent && EventHandlerService.isEnabled()) {
232                    generateEvent(wfAction, wfJob.getUser());
233                }
234            }
235            catch (JPAExecutorException e) {
236                throw new CommandException(e);
237            }
238        }
239
240        LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
241        return null;
242    }
243
244    protected long getRetryInterval() {
245        return (executor != null) ? executor.getRetryInterval() : ActionExecutor.RETRY_INTERVAL;
246    }
247
248    @Override
249    public String getKey() {
250        return getName() + "_" + actionId;
251    }
252
253}