001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.WorkflowActionBean;
026import org.apache.oozie.WorkflowJobBean;
027import org.apache.oozie.client.WorkflowJob;
028import org.apache.oozie.client.rest.JsonBean;
029import org.apache.oozie.command.CommandException;
030import org.apache.oozie.command.PreconditionException;
031import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
032import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
033import org.apache.oozie.executor.jpa.JPAExecutorException;
034import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
035import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
036import org.apache.oozie.service.EventHandlerService;
037import org.apache.oozie.service.JPAService;
038import org.apache.oozie.service.Services;
039import org.apache.oozie.util.InstrumentUtils;
040import org.apache.oozie.util.LogUtils;
041import org.apache.oozie.util.ParamChecker;
042import org.apache.oozie.workflow.WorkflowException;
043import org.apache.oozie.workflow.WorkflowInstance;
044import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
045
046public class SuspendXCommand extends WorkflowXCommand<Void> {
047    private final String wfid;
048    private WorkflowJobBean wfJobBean;
049    private JPAService jpaService;
050    private List<JsonBean> updateList = new ArrayList<JsonBean>();
051
052    public SuspendXCommand(String id) {
053        super("suspend", "suspend", 1);
054        this.wfid = ParamChecker.notEmpty(id, "wfid");
055    }
056
057    @Override
058    protected Void execute() throws CommandException {
059        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
060        try {
061            suspendJob(this.jpaService, this.wfJobBean, this.wfid, null, updateList);
062            this.wfJobBean.setLastModifiedTime(new Date());
063            updateList.add(this.wfJobBean);
064            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
065            queue(new NotificationXCommand(this.wfJobBean));
066        }
067        catch (WorkflowException e) {
068            throw new CommandException(e);
069        }
070        catch (JPAExecutorException je) {
071            throw new CommandException(je);
072        }
073        finally {
074            // update coordinator action
075            new CoordActionUpdateXCommand(wfJobBean).call();
076        }
077        return null;
078    }
079
080    /**
081     * Suspend the workflow job and pending flag to false for the actions that are START_RETRY or START_MANUAL or
082     * END_RETRY or END_MANUAL
083     *
084     * @param jpaService jpa service
085     * @param workflow workflow job
086     * @param id workflow job id
087     * @param actionId workflow action id
088     * @throws WorkflowException thrown if failed to suspend workflow instance
089     * @throws CommandException thrown if unable set pending false for actions
090     */
091    public static void suspendJob(JPAService jpaService, WorkflowJobBean workflow, String id,
092            String actionId, List<JsonBean> updateList) throws WorkflowException, CommandException {
093        if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
094            workflow.getWorkflowInstance().suspend();
095            WorkflowInstance wfInstance = workflow.getWorkflowInstance();
096            ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
097            workflow.setStatus(WorkflowJob.Status.SUSPENDED);
098            workflow.setWorkflowInstance(wfInstance);
099
100            setPendingFalseForActions(jpaService, id, actionId, updateList);
101            if (EventHandlerService.isEnabled()) {
102                generateEvent(workflow);
103            }
104        }
105    }
106
107    /**
108     * Set pending flag to false for the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL
109     * <p/>
110     *
111     * @param jpaService jpa service
112     * @param id workflow job id
113     * @param actionId workflow action id
114     * @throws CommandException thrown if failed to update workflow action
115     */
116    private static void setPendingFalseForActions(JPAService jpaService, String id, String actionId,
117            List<JsonBean> updateList) throws CommandException {
118        List<WorkflowActionBean> actions;
119        try {
120            actions = jpaService.execute(new WorkflowActionRetryManualGetJPAExecutor(id));
121
122            for (WorkflowActionBean action : actions) {
123                if (actionId != null && actionId.equals(action.getId())) {
124                    // this action has been changed in handleNonTransient()
125                    continue;
126                }
127                else {
128                    action.resetPendingOnly();
129                }
130                if (updateList != null) { // will be null when suspendJob
131                                          // invoked statically via
132                                          // handleNonTransient()
133                    updateList.add(action);
134                }
135            }
136        }
137        catch (JPAExecutorException je) {
138            throw new CommandException(je);
139        }
140    }
141
142    @Override
143    protected void eagerLoadState() throws CommandException {
144        super.eagerLoadState();
145        try {
146            jpaService = Services.get().get(JPAService.class);
147            if (jpaService != null) {
148                this.wfJobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.wfid));
149            }
150            else {
151                throw new CommandException(ErrorCode.E0610);
152            }
153        }
154        catch (Exception ex) {
155            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
156        }
157        LogUtils.setLogInfo(this.wfJobBean, logInfo);
158    }
159
160    @Override
161    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
162        super.eagerVerifyPrecondition();
163        if (this.wfJobBean.getStatus() != WorkflowJob.Status.RUNNING) {
164            throw new PreconditionException(ErrorCode.E0727, this.wfJobBean.getId(), this.wfJobBean.getStatus());
165        }
166    }
167
168    @Override
169    public String getEntityKey() {
170        return this.wfid;
171    }
172
173    @Override
174    public String getKey() {
175        return getName() + "_" + this.wfid;
176    }
177
178    @Override
179    protected boolean isLockRequired() {
180        return true;
181    }
182
183    @Override
184    protected void loadState() throws CommandException {
185        eagerLoadState();
186    }
187
188    @Override
189    protected void verifyPrecondition() throws CommandException, PreconditionException {
190        eagerVerifyPrecondition();
191    }
192}