001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.SLAEventBean;
026import org.apache.oozie.WorkflowActionBean;
027import org.apache.oozie.WorkflowJobBean;
028import org.apache.oozie.XException;
029import org.apache.oozie.client.SLAEvent.SlaAppType;
030import org.apache.oozie.client.SLAEvent.Status;
031import org.apache.oozie.client.rest.JsonBean;
032import org.apache.oozie.command.CommandException;
033import org.apache.oozie.command.PreconditionException;
034import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
035import org.apache.oozie.executor.jpa.JPAExecutorException;
036import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
037import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
038import org.apache.oozie.action.ActionExecutor;
039import org.apache.oozie.action.ActionExecutorException;
040import org.apache.oozie.action.control.ControlNodeActionExecutor;
041import org.apache.oozie.service.ActionService;
042import org.apache.oozie.service.EventHandlerService;
043import org.apache.oozie.service.JPAService;
044import org.apache.oozie.service.UUIDService;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.util.LogUtils;
047import org.apache.oozie.util.Instrumentation;
048import org.apache.oozie.util.db.SLADbXOperations;
049
050/**
051 * Kill workflow action and invoke action executor to kill the underlying context.
052 *
053 */
054@SuppressWarnings("deprecation")
055public class ActionKillXCommand extends ActionXCommand<Void> {
056    private String actionId;
057    private String jobId;
058    private WorkflowJobBean wfJob;
059    private WorkflowActionBean wfAction;
060    private JPAService jpaService = null;
061    private List<JsonBean> updateList = new ArrayList<JsonBean>();
062    private List<JsonBean> insertList = new ArrayList<JsonBean>();
063
064    public ActionKillXCommand(String actionId, String type) {
065        super("action.kill", type, 0);
066        this.actionId = actionId;
067        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
068    }
069
070    public ActionKillXCommand(String actionId) {
071        this(actionId, "action.kill");
072    }
073
074    @Override
075    protected boolean isLockRequired() {
076        return true;
077    }
078
079    @Override
080    public String getEntityKey() {
081        return this.jobId;
082    }
083
084    @Override
085    public String getKey() {
086        return getName() + "_" + this.actionId;
087    }
088
089    @Override
090    protected void loadState() throws CommandException {
091        try {
092            jpaService = Services.get().get(JPAService.class);
093
094            if (jpaService != null) {
095                this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
096                this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
097                LogUtils.setLogInfo(wfJob, logInfo);
098                LogUtils.setLogInfo(wfAction, logInfo);
099            }
100            else {
101                throw new CommandException(ErrorCode.E0610);
102            }
103        }
104        catch (XException ex) {
105            throw new CommandException(ex);
106        }
107    }
108
109    @Override
110    protected void verifyPrecondition() throws CommandException, PreconditionException {
111        if (wfAction.getStatus() != WorkflowActionBean.Status.KILLED) {
112            throw new PreconditionException(ErrorCode.E0726, wfAction.getId());
113        }
114    }
115
116    @Override
117    protected Void execute() throws CommandException {
118        LOG.debug("STARTED WorkflowActionKillXCommand for action " + actionId);
119
120        if (wfAction.isPending()) {
121            ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
122            if (executor != null) {
123                try {
124                    boolean isRetry = false;
125                    boolean isUserRetry = false;
126                    ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction,
127                            isRetry, isUserRetry);
128                    incrActionCounter(wfAction.getType(), 1);
129
130                    Instrumentation.Cron cron = new Instrumentation.Cron();
131                    cron.start();
132                    executor.kill(context, wfAction);
133                    cron.stop();
134                    addActionCron(wfAction.getType(), cron);
135
136                    wfAction.resetPending();
137                    wfAction.setStatus(WorkflowActionBean.Status.KILLED);
138                    wfAction.setEndTime(new Date());
139
140                    updateList.add(wfAction);
141                    wfJob.setLastModifiedTime(new Date());
142                    updateList.add(wfJob);
143                    // Add SLA status event (KILLED) for WF_ACTION
144                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
145                            SlaAppType.WORKFLOW_ACTION);
146                    if(slaEvent != null) {
147                        insertList.add(slaEvent);
148                    }
149                    queue(new NotificationXCommand(wfJob, wfAction));
150                }
151                catch (ActionExecutorException ex) {
152                    wfAction.resetPending();
153                    wfAction.setStatus(WorkflowActionBean.Status.FAILED);
154                    wfAction.setErrorInfo(ex.getErrorCode().toString(),
155                            "KILL COMMAND FAILED - exception while executing job kill");
156                    wfAction.setEndTime(new Date());
157
158                    wfJob.setStatus(WorkflowJobBean.Status.KILLED);
159                    updateList.add(wfAction);
160                    wfJob.setLastModifiedTime(new Date());
161                    updateList.add(wfJob);
162                    // What will happen to WF and COORD_ACTION, NOTIFICATION?
163                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
164                            SlaAppType.WORKFLOW_ACTION);
165                    if(slaEvent != null) {
166                        insertList.add(slaEvent);
167                    }
168                    LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]",
169                            ex.getErrorCode(), ex.getMessage(), ex);
170                }
171                finally {
172                    try {
173                        jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
174                        if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
175                            generateEvent(wfAction, wfJob.getUser());
176                        }
177                    }
178                    catch (JPAExecutorException e) {
179                        throw new CommandException(e);
180                    }
181                }
182            }
183        }
184        LOG.debug("ENDED WorkflowActionKillXCommand for action " + actionId);
185        return null;
186    }
187
188}