001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import org.apache.oozie.action.control.ControlNodeActionExecutor;
021import org.apache.oozie.client.WorkflowJob;
022import org.apache.oozie.client.SLAEvent.SlaAppType;
023import org.apache.oozie.client.SLAEvent.Status;
024import org.apache.oozie.client.rest.JsonBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.SLAEventBean;
027import org.apache.oozie.WorkflowActionBean;
028import org.apache.oozie.WorkflowJobBean;
029import org.apache.oozie.XException;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.PreconditionException;
032import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
033import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
034import org.apache.oozie.executor.jpa.JPAExecutorException;
035import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
036import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
037import org.apache.oozie.service.ActionService;
038import org.apache.oozie.service.EventHandlerService;
039import org.apache.oozie.service.JPAService;
040import org.apache.oozie.service.Services;
041import org.apache.oozie.workflow.WorkflowException;
042import org.apache.oozie.workflow.WorkflowInstance;
043import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
044import org.apache.oozie.util.InstrumentUtils;
045import org.apache.oozie.util.LogUtils;
046import org.apache.oozie.util.ParamChecker;
047import org.apache.oozie.util.db.SLADbXOperations;
048
049import java.util.ArrayList;
050import java.util.Date;
051import java.util.List;
052
053/**
054 * Kill workflow job and its workflow instance and queue a {@link WorkflowActionKillXCommand} to kill the workflow
055 * actions.
056 */
057@SuppressWarnings("deprecation")
058public class KillXCommand extends WorkflowXCommand<Void> {
059
060    private String wfId;
061    private WorkflowJobBean wfJob;
062    private List<WorkflowActionBean> actionList;
063    private ActionService actionService;
064    private JPAService jpaService = null;
065    private List<JsonBean> updateList = new ArrayList<JsonBean>();
066    private List<JsonBean> insertList = new ArrayList<JsonBean>();
067
068    public KillXCommand(String wfId) {
069        super("kill", "kill", 1);
070        this.wfId = ParamChecker.notEmpty(wfId, "wfId");
071    }
072
073    @Override
074    protected boolean isLockRequired() {
075        return true;
076    }
077
078    @Override
079    public String getEntityKey() {
080        return this.wfId;
081    }
082
083    @Override
084    public String getKey() {
085        return getName() + "_" + this.wfId;
086    }
087
088    @Override
089    protected void loadState() throws CommandException {
090        try {
091            jpaService = Services.get().get(JPAService.class);
092            if (jpaService != null) {
093                this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId));
094                this.actionList = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(wfId));
095                LogUtils.setLogInfo(wfJob, logInfo);
096            }
097            else {
098                throw new CommandException(ErrorCode.E0610);
099            }
100            actionService = Services.get().get(ActionService.class);
101        }
102        catch (XException ex) {
103            throw new CommandException(ex);
104        }
105    }
106
107    @Override
108    protected void verifyPrecondition() throws CommandException, PreconditionException {
109        if (wfJob.getStatus() != WorkflowJob.Status.PREP && wfJob.getStatus() != WorkflowJob.Status.RUNNING
110                && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED && wfJob.getStatus() != WorkflowJob.Status.FAILED) {
111            throw new PreconditionException(ErrorCode.E0725, wfJob.getId());
112        }
113    }
114
115    @Override
116    protected Void execute() throws CommandException {
117        LOG.info("STARTED WorkflowKillXCommand for jobId=" + wfId);
118
119        wfJob.setEndTime(new Date());
120
121        if (wfJob.getStatus() != WorkflowJob.Status.FAILED) {
122            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
123            wfJob.setStatus(WorkflowJob.Status.KILLED);
124            SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(),
125                    Status.KILLED, SlaAppType.WORKFLOW_JOB);
126            if(slaEvent != null) {
127                insertList.add(slaEvent);
128            }
129            try {
130                wfJob.getWorkflowInstance().kill();
131            }
132            catch (WorkflowException e) {
133                throw new CommandException(ErrorCode.E0725, e.getMessage(), e);
134            }
135            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
136            ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.KILLED);
137            wfJob.setWorkflowInstance(wfInstance);
138        }
139        try {
140            for (WorkflowActionBean action : actionList) {
141                if (action.getStatus() == WorkflowActionBean.Status.RUNNING
142                        || action.getStatus() == WorkflowActionBean.Status.DONE) {
143                    action.setPending();
144                    action.setStatus(WorkflowActionBean.Status.KILLED);
145
146                    updateList.add(action);
147
148                    queue(new ActionKillXCommand(action.getId(), action.getType()));
149                }
150                else if (action.getStatus() == WorkflowActionBean.Status.PREP
151                        || action.getStatus() == WorkflowActionBean.Status.START_RETRY
152                        || action.getStatus() == WorkflowActionBean.Status.START_MANUAL
153                        || action.getStatus() == WorkflowActionBean.Status.END_RETRY
154                        || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
155
156                    action.setStatus(WorkflowActionBean.Status.KILLED);
157                    action.resetPending();
158                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
159                            Status.KILLED, SlaAppType.WORKFLOW_ACTION);
160                    if(slaEvent != null) {
161                        insertList.add(slaEvent);
162                    }
163                    updateList.add(action);
164                    if (EventHandlerService.isEnabled()
165                            && !(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) {
166                        generateEvent(action, wfJob.getUser());
167                    }
168                }
169            }
170            wfJob.setLastModifiedTime(new Date());
171            updateList.add(wfJob);
172            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
173            if (EventHandlerService.isEnabled()) {
174                generateEvent(wfJob);
175            }
176            queue(new NotificationXCommand(wfJob));
177        }
178        catch (JPAExecutorException e) {
179            throw new CommandException(e);
180        }
181        finally {
182            if(wfJob.getStatus() == WorkflowJob.Status.KILLED) {
183                 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
184            }
185            // update coordinator action
186            new CoordActionUpdateXCommand(wfJob).call();
187        }
188
189        LOG.info("ENDED WorkflowKillXCommand for jobId=" + wfId);
190        return null;
191    }
192
193}