001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.Date;
021import java.util.List;
022
023import org.apache.oozie.BundleActionBean;
024import org.apache.oozie.BundleJobBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.client.Job;
027import org.apache.oozie.command.CommandException;
028import org.apache.oozie.command.PreconditionException;
029import org.apache.oozie.command.ResumeTransitionXCommand;
030import org.apache.oozie.command.coord.CoordResumeXCommand;
031import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
032import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor;
033import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
034import org.apache.oozie.executor.jpa.JPAExecutorException;
035import org.apache.oozie.service.JPAService;
036import org.apache.oozie.service.Services;
037import org.apache.oozie.util.InstrumentUtils;
038import org.apache.oozie.util.LogUtils;
039import org.apache.oozie.util.ParamChecker;
040
041public class BundleJobResumeXCommand extends ResumeTransitionXCommand {
042
043    private final String bundleId;
044    private JPAService jpaService = null;
045    private BundleJobBean bundleJob;
046    private List<BundleActionBean> bundleActions;
047
048    /**
049     * Constructor to create the Bundle Resume Command.
050     *
051     * @param jobId : Bundle Id
052     */
053    public BundleJobResumeXCommand(String jobId) {
054        super("bundle_resume", "bundle_resume", 1);
055        this.bundleId = ParamChecker.notNull(jobId, "BundleId");
056    }
057
058    /* (non-Javadoc)
059     * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren()
060     */
061    @Override
062    public void resumeChildren() {
063        for (BundleActionBean action : bundleActions) {
064            if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) {
065                // queue a CoordResumeXCommand
066                if (action.getCoordId() != null) {
067                    queue(new CoordResumeXCommand(action.getCoordId()));
068                    updateBundleAction(action);
069                    LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]",
070                                    action.getBundleActionId(), action.getStatus(), action.getPending(), action
071                                            .getCoordId());
072                }
073                else {
074                    updateBundleAction(action);
075                    LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null",
076                                    action.getBundleActionId(), action.getStatus(), action.getPending());
077                }
078            }
079        }
080        LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId);
081    }
082
083    private void updateBundleAction(BundleActionBean action) {
084        if (action.getStatus() == Job.Status.PREPSUSPENDED) {
085            action.setStatus(Job.Status.PREP);
086        }
087        else if (action.getStatus() == Job.Status.SUSPENDED) {
088            action.setStatus(Job.Status.RUNNING);
089        }
090        else if (action.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
091            action.setStatus(Job.Status.RUNNINGWITHERROR);
092        }
093        action.incrementAndGetPending();
094        action.setLastModifiedTime(new Date());
095        updateList.add(action);
096    }
097
098    /* (non-Javadoc)
099     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
100     */
101    @Override
102    public void notifyParent() throws CommandException {
103
104    }
105
106    /* (non-Javadoc)
107     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
108     */
109    @Override
110    public void updateJob() {
111        InstrumentUtils.incrJobCounter("bundle_resume", 1, null);
112        bundleJob.setSuspendedTime(null);
113        bundleJob.setLastModifiedTime(new Date());
114        LOG.debug("Resume bundle job id = " + bundleId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending());
115        updateList.add(bundleJob);
116    }
117
118    /* (non-Javadoc)
119     * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites()
120     */
121    @Override
122    public void performWrites() throws CommandException {
123        try {
124            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
125        }
126        catch (JPAExecutorException e) {
127            throw new CommandException(e);
128        }
129    }
130
131    /* (non-Javadoc)
132     * @see org.apache.oozie.command.XCommand#getEntityKey()
133     */
134    @Override
135    public String getEntityKey() {
136        return bundleId;
137    }
138
139    /* (non-Javadoc)
140     * @see org.apache.oozie.command.XCommand#isLockRequired()
141     */
142    @Override
143    protected boolean isLockRequired() {
144        return true;
145    }
146
147    /* (non-Javadoc)
148     * @see org.apache.oozie.command.XCommand#loadState()
149     */
150    @Override
151    protected void loadState() throws CommandException {
152        jpaService = Services.get().get(JPAService.class);
153        if (jpaService == null) {
154            throw new CommandException(ErrorCode.E0610);
155        }
156
157        try {
158            bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
159            bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(bundleId));
160        }
161        catch (Exception Ex) {
162            throw new CommandException(ErrorCode.E0604, bundleId);
163        }
164
165        LogUtils.setLogInfo(bundleJob, logInfo);
166    }
167
168    /* (non-Javadoc)
169     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
170     */
171    @Override
172    protected void verifyPrecondition() throws CommandException, PreconditionException {
173        if (bundleJob.getStatus() != Job.Status.SUSPENDED && bundleJob.getStatus() != Job.Status.SUSPENDEDWITHERROR && bundleJob.getStatus() != Job.Status.PREPSUSPENDED) {
174            throw new PreconditionException(ErrorCode.E1100, "BundleResumeCommand not Resumed - "
175                    + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state " + bundleId);
176        }
177    }
178
179    /* (non-Javadoc)
180     * @see org.apache.oozie.command.TransitionXCommand#getJob()
181     */
182    @Override
183    public Job getJob() {
184        return bundleJob;
185    }
186}