001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.Date;
021import java.util.List;
022
023import org.apache.oozie.BundleActionBean;
024import org.apache.oozie.BundleJobBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.XException;
027import org.apache.oozie.client.Job;
028import org.apache.oozie.command.CommandException;
029import org.apache.oozie.command.KillTransitionXCommand;
030import org.apache.oozie.command.PreconditionException;
031import org.apache.oozie.command.coord.CoordKillXCommand;
032import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
033import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
034import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
035import org.apache.oozie.executor.jpa.JPAExecutorException;
036import org.apache.oozie.service.JPAService;
037import org.apache.oozie.service.Services;
038import org.apache.oozie.util.LogUtils;
039import org.apache.oozie.util.ParamChecker;
040
041public class BundleKillXCommand extends KillTransitionXCommand {
042    private final String jobId;
043    private BundleJobBean bundleJob;
044    private List<BundleActionBean> bundleActions;
045    private JPAService jpaService = null;
046
047    public BundleKillXCommand(String jobId) {
048        super("bundle_kill", "bundle_kill", 1);
049        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
050    }
051
052    /* (non-Javadoc)
053     * @see org.apache.oozie.command.XCommand#getEntityKey()
054     */
055    @Override
056    public String getEntityKey() {
057        return jobId;
058    }
059
060    @Override
061    public String getKey() {
062        return getName() + "_" + jobId;
063    }
064
065    /* (non-Javadoc)
066     * @see org.apache.oozie.command.XCommand#isLockRequired()
067     */
068    @Override
069    protected boolean isLockRequired() {
070        return true;
071    }
072
073    /* (non-Javadoc)
074     * @see org.apache.oozie.command.XCommand#loadState()
075     */
076    @Override
077    public void loadState() throws CommandException {
078        try {
079            jpaService = Services.get().get(JPAService.class);
080
081            if (jpaService != null) {
082                this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
083                this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
084                LogUtils.setLogInfo(bundleJob, logInfo);
085                super.setJob(bundleJob);
086
087            }
088            else {
089                throw new CommandException(ErrorCode.E0610);
090            }
091        }
092        catch (XException ex) {
093            throw new CommandException(ex);
094        }
095    }
096
097    /* (non-Javadoc)
098     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
099     */
100    @Override
101    protected void verifyPrecondition() throws CommandException, PreconditionException {
102        if (bundleJob.getStatus() == Job.Status.SUCCEEDED
103                || bundleJob.getStatus() == Job.Status.FAILED
104                || bundleJob.getStatus() == Job.Status.DONEWITHERROR
105                || bundleJob.getStatus() == Job.Status.KILLED) {
106            LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = "
107                    + jobId + ", status = " + bundleJob.getStatus());
108            throw new PreconditionException(ErrorCode.E1020, jobId);
109        }
110    }
111
112    /* (non-Javadoc)
113     * @see org.apache.oozie.command.KillTransitionXCommand#killChildren()
114     */
115    @Override
116    public void killChildren() throws CommandException {
117        if (bundleActions != null) {
118            for (BundleActionBean action : bundleActions) {
119                if (action.getCoordId() != null) {
120                    queue(new CoordKillXCommand(action.getCoordId()));
121                    updateBundleAction(action);
122                    LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]",
123                            action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
124                } else {
125                    updateBundleAction(action);
126                    LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action
127                            .getStatus(), action.getPending());
128                }
129
130            }
131        }
132        LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId);
133    }
134
135    /**
136     * Update bundle action
137     *
138     * @param action
139     * @throws CommandException
140     */
141    private void updateBundleAction(BundleActionBean action) {
142        action.setLastModifiedTime(new Date());
143        if (!action.isTerminalStatus()) {
144            action.incrementAndGetPending();
145            action.setStatus(Job.Status.KILLED);
146        }
147        updateList.add(action);
148    }
149
150    /* (non-Javadoc)
151     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
152     */
153    @Override
154    public void notifyParent() {
155    }
156
157    /* (non-Javadoc)
158     * @see org.apache.oozie.command.TransitionXCommand#getJob()
159     */
160    @Override
161    public Job getJob() {
162        return bundleJob;
163    }
164
165    /* (non-Javadoc)
166     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
167     */
168    @Override
169    public void updateJob() {
170        updateList.add(bundleJob);
171    }
172
173    /* (non-Javadoc)
174     * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
175     */
176    @Override
177    public void performWrites() throws CommandException {
178        try {
179            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
180        }
181        catch (JPAExecutorException e) {
182            throw new CommandException(e);
183        }
184    }
185
186}