001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.Date;
021import java.util.List;
022
023import org.apache.oozie.BundleActionBean;
024import org.apache.oozie.BundleJobBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.client.Job;
027import org.apache.oozie.command.CommandException;
028import org.apache.oozie.command.PreconditionException;
029import org.apache.oozie.command.SuspendTransitionXCommand;
030import org.apache.oozie.command.coord.CoordSuspendXCommand;
031import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
032import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
033import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
034import org.apache.oozie.executor.jpa.JPAExecutorException;
035import org.apache.oozie.service.JPAService;
036import org.apache.oozie.service.Services;
037import org.apache.oozie.util.InstrumentUtils;
038import org.apache.oozie.util.ParamChecker;
039import org.apache.oozie.util.LogUtils;
040
041public class BundleJobSuspendXCommand extends SuspendTransitionXCommand {
042    private final String jobId;
043    private JPAService jpaService;
044    private List<BundleActionBean> bundleActions;
045    private BundleJobBean bundleJob;
046
047    public BundleJobSuspendXCommand(String id) {
048        super("bundle_suspend", "bundle_suspend", 1);
049        this.jobId = ParamChecker.notEmpty(id, "id");
050    }
051
052    /* (non-Javadoc)
053     * @see org.apache.oozie.command.TransitionXCommand#getJob()
054     */
055    @Override
056    public Job getJob() {
057        return bundleJob;
058    }
059
060    /* (non-Javadoc)
061     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
062     */
063    @Override
064    public void notifyParent() throws CommandException {
065    }
066
067    /* (non-Javadoc)
068     * @see org.apache.oozie.command.TransitionXCommand#setJob(org.apache.oozie.client.Job)
069     */
070    @Override
071    public void setJob(Job job) {
072    }
073
074    /* (non-Javadoc)
075     * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites()
076     */
077    @Override
078    public void performWrites() throws CommandException {
079        try {
080            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
081        }
082        catch (JPAExecutorException e) {
083            throw new CommandException(e);
084        }
085    }
086
087    /* (non-Javadoc)
088     * @see org.apache.oozie.command.XCommand#getEntityKey()
089     */
090    @Override
091    public String getEntityKey() {
092        return this.jobId;
093    }
094
095    /* (non-Javadoc)
096     * @see org.apache.oozie.command.XCommand#isLockRequired()
097     */
098    @Override
099    protected boolean isLockRequired() {
100        return true;
101    }
102
103    /* (non-Javadoc)
104     * @see org.apache.oozie.command.XCommand#loadState()
105     */
106    @Override
107    protected void loadState() throws CommandException {
108        jpaService = Services.get().get(JPAService.class);
109        if (jpaService == null) {
110            throw new CommandException(ErrorCode.E0610);
111        }
112
113        try {
114            bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
115        }
116        catch (Exception Ex) {
117            throw new CommandException(ErrorCode.E0604, jobId);
118        }
119
120        try {
121            bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId));
122        }
123        catch (Exception Ex) {
124            throw new CommandException(ErrorCode.E1311, jobId);
125        }
126
127        LogUtils.setLogInfo(bundleJob, logInfo);
128    }
129
130    /* (non-Javadoc)
131     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
132     */
133    @Override
134    protected void verifyPrecondition() throws CommandException, PreconditionException {
135        if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED
136                || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR) {
137            LOG.info("BundleJobSuspendXCommand is not going to execute because job either succeeded, failed, killed, or donewitherror; id = "
138                            + jobId + ", status = " + bundleJob.getStatus());
139            throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString());
140        }
141    }
142
143    /* (non-Javadoc)
144     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
145     */
146    @Override
147    public void updateJob() {
148        InstrumentUtils.incrJobCounter("bundle_suspend", 1, null);
149        bundleJob.setSuspendedTime(new Date());
150        bundleJob.setLastModifiedTime(new Date());
151
152        LOG.debug("Suspend bundle job id = " + jobId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
153        updateList.add(bundleJob);
154    }
155
156    @Override
157    public void suspendChildren() throws CommandException {
158        for (BundleActionBean action : this.bundleActions) {
159            if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.RUNNINGWITHERROR
160                    || action.getStatus() == Job.Status.PREP || action.getStatus() == Job.Status.PAUSED
161                    || action.getStatus() == Job.Status.PAUSEDWITHERROR) {
162                // queue a CoordSuspendXCommand
163                if (action.getCoordId() != null) {
164                    queue(new CoordSuspendXCommand(action.getCoordId()));
165                    updateBundleAction(action);
166                    LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]",
167                            action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId());
168                } else {
169                    updateBundleAction(action);
170                    LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
171                            action.getBundleActionId(), action.getStatus(), action.getPending());
172                }
173
174            }
175        }
176        LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId);
177    }
178
179    private void updateBundleAction(BundleActionBean action) {
180        if (action.getStatus() == Job.Status.PREP) {
181            action.setStatus(Job.Status.PREPSUSPENDED);
182        }
183        else if (action.getStatus() == Job.Status.RUNNING) {
184            action.setStatus(Job.Status.SUSPENDED);
185        }
186        else if (action.getStatus() == Job.Status.RUNNINGWITHERROR) {
187            action.setStatus(Job.Status.SUSPENDEDWITHERROR);
188        }
189        else if (action.getStatus() == Job.Status.PAUSED) {
190            action.setStatus(Job.Status.SUSPENDED);
191        }
192        else if (action.getStatus() == Job.Status.PAUSEDWITHERROR) {
193            action.setStatus(Job.Status.SUSPENDEDWITHERROR);
194        }
195
196        action.incrementAndGetPending();
197        action.setLastModifiedTime(new Date());
198        updateList.add(action);
199    }
200}