001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import org.apache.oozie.client.CoordinatorAction;
021import org.apache.oozie.client.CoordinatorJob;
022import org.apache.oozie.client.Job;
023import org.apache.oozie.CoordinatorActionBean;
024import org.apache.oozie.CoordinatorJobBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.XException;
027import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
028import org.apache.oozie.command.wf.KillXCommand;
029import org.apache.oozie.command.CommandException;
030import org.apache.oozie.command.KillTransitionXCommand;
031import org.apache.oozie.command.PreconditionException;
032import org.apache.oozie.dependency.DependencyChecker;
033import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
034import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
035import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
036import org.apache.oozie.executor.jpa.JPAExecutorException;
037import org.apache.oozie.service.EventHandlerService;
038import org.apache.oozie.service.JPAService;
039import org.apache.oozie.service.Services;
040import org.apache.oozie.util.LogUtils;
041import org.apache.oozie.util.ParamChecker;
042import org.apache.oozie.util.StatusUtils;
043
044import java.util.Arrays;
045import java.util.Date;
046import java.util.List;
047
048public class CoordKillXCommand extends KillTransitionXCommand {
049
050    private final String jobId;
051    private CoordinatorJobBean coordJob;
052    private List<CoordinatorActionBean> actionList;
053    private JPAService jpaService = null;
054    private CoordinatorJob.Status prevStatus = null;
055
056    public CoordKillXCommand(String id) {
057        super("coord_kill", "coord_kill", 2);
058        this.jobId = ParamChecker.notEmpty(id, "id");
059    }
060
061    @Override
062    protected boolean isLockRequired() {
063        return true;
064    }
065
066    @Override
067    public String getEntityKey() {
068        return this.jobId;
069    }
070
071    @Override
072    public String getKey() {
073        return getName() + "_" + this.jobId;
074    }
075
076    @Override
077    protected void loadState() throws CommandException {
078        try {
079            jpaService = Services.get().get(JPAService.class);
080
081            if (jpaService != null) {
082                this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
083                //Get actions which are not succeeded, failed, timed out or killed
084                this.actionList = jpaService.execute(new CoordJobGetActionsNotCompletedJPAExecutor(jobId));
085                prevStatus = coordJob.getStatus();
086                LogUtils.setLogInfo(coordJob, logInfo);
087            }
088            else {
089                throw new CommandException(ErrorCode.E0610);
090            }
091        }
092        catch (XException ex) {
093            throw new CommandException(ex);
094        }
095    }
096
097    @Override
098    protected void verifyPrecondition() throws CommandException, PreconditionException {
099        // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed
100        if (StatusUtils.isV1CoordjobKillable(coordJob)) {
101            return;
102        }
103        if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED
104                || coordJob.getStatus() == CoordinatorJob.Status.FAILED
105                || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
106                || coordJob.getStatus() == CoordinatorJob.Status.KILLED) {
107            LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
108                    + jobId + ", status = " + coordJob.getStatus());
109            throw new PreconditionException(ErrorCode.E1020, jobId);
110        }
111    }
112
113    private void updateCoordAction(CoordinatorActionBean action, boolean makePending) {
114        CoordinatorAction.Status prevStatus = action.getStatus();
115        action.setStatus(CoordinatorActionBean.Status.KILLED);
116        if (makePending) {
117            action.incrementAndGetPending();
118        } else {
119            // set pending to false
120            action.setPending(0);
121        }
122        if (EventHandlerService.isEnabled() && prevStatus != CoordinatorAction.Status.RUNNING
123                && prevStatus != CoordinatorAction.Status.SUSPENDED) {
124            CoordinatorXCommand.generateEvent(action, coordJob.getUser(), coordJob.getAppName(), null);
125        }
126        action.setLastModifiedTime(new Date());
127        updateList.add(action);
128    }
129
130    @Override
131    public void killChildren() throws CommandException {
132        if (actionList != null) {
133            for (CoordinatorActionBean action : actionList) {
134                // queue a WorkflowKillXCommand to delete the workflow job and actions
135                if (action.getExternalId() != null) {
136                    queue(new KillXCommand(action.getExternalId()));
137                    // As the kill command for children is queued, set pending flag for coord action to be true
138                    updateCoordAction(action, true);
139                    LOG.debug(
140                            "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
141                            action.getId(), action.getStatus(), action.getPending(), action.getExternalId());
142                }
143                else {
144                    // As killing children is not required, set pending flag for coord action to be false
145                    updateCoordAction(action, false);
146                    LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]",
147                            action.getId(), action.getStatus(), action.getPending());
148                }
149                String pushMissingDeps = action.getPushMissingDependencies();
150                if (pushMissingDeps != null) {
151                    CoordPushDependencyCheckXCommand.unregisterMissingDependencies(
152                            Arrays.asList(DependencyChecker.dependenciesAsArray(pushMissingDeps)), action.getId());
153                }
154            }
155        }
156        coordJob.setDoneMaterialization();
157        updateList.add(coordJob);
158
159        LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
160    }
161
162    @Override
163    public void notifyParent() throws CommandException {
164        // update bundle action
165        if (coordJob.getBundleId() != null) {
166            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
167            bundleStatusUpdate.call();
168        }
169    }
170
171    @Override
172    public void updateJob() throws CommandException {
173        updateList.add(coordJob);
174    }
175
176    @Override
177    public void performWrites() throws CommandException {
178        try {
179            jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
180        }
181        catch (JPAExecutorException e) {
182            throw new CommandException(e);
183        }
184    }
185
186    @Override
187    public Job getJob() {
188        return coordJob;
189    }
190
191}