001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.util.Date;
021
022import org.apache.oozie.BundleActionBean;
023import org.apache.oozie.CoordinatorJobBean;
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.XException;
026import org.apache.oozie.client.CoordinatorJob;
027import org.apache.oozie.client.Job;
028import org.apache.oozie.command.CommandException;
029import org.apache.oozie.command.PreconditionException;
030import org.apache.oozie.command.StatusUpdateXCommand;
031import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor;
032import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor;
033import org.apache.oozie.executor.jpa.JPAExecutorException;
034import org.apache.oozie.service.JPAService;
035import org.apache.oozie.service.Services;
036
037/**
038 * The command to update Bundle status
039 */
040public class BundleStatusUpdateXCommand extends StatusUpdateXCommand {
041    private final CoordinatorJobBean coordjob;
042    private JPAService jpaService = null;
043    private BundleActionBean bundleaction;
044    private final Job.Status prevStatus;
045
046    /**
047     * The constructor for class {@link BundleStatusUpdateXCommand}
048     *
049     * @param coordjob coordinator job bean
050     * @param prevStatus coordinator job old status
051     */
052    public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) {
053        super("BundleStatusUpdate", "BundleStatusUpdate", 1);
054        this.coordjob = coordjob;
055        this.prevStatus = prevStatus;
056    }
057
058    /* (non-Javadoc)
059     * @see org.apache.oozie.command.XCommand#execute()
060     */
061    @Override
062    protected Void execute() throws CommandException {
063        try {
064            LOG.debug("STARTED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId()
065                    + " coord job ID: " + coordjob.getId() + " coord Status " + coordjob.getStatus());
066            Job.Status coordCurrentStatus = coordjob.getStatus();
067            // The status of bundle action should not be updated if the bundle action is in terminal state
068            // and coord Id is null. For e.g if Bundleaction is killed and coord Id is null, then the status of bundle
069            // should not be changed.
070            if (bundleaction.getCoordId() != null || !bundleaction.isTerminalStatus()) {
071                bundleaction.setStatus(coordCurrentStatus);
072            }
073            if (bundleaction.isPending()) {
074                bundleaction.decrementAndGetPending();
075            }
076            // TODO - Uncomment this when bottom up rerun can change terminal state
077            /*BundleJobBean bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleaction.getBundleId()));
078            if (!bundleJob.isPending()) {
079                bundleJob.setPending();
080                jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
081                LOG.info("Updated bundle job [{0}] pending to true", bundleaction.getBundleId());
082            }*/
083
084            bundleaction.setLastModifiedTime(new Date());
085            bundleaction.setCoordId(coordjob.getId());
086            jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
087            if (bundleaction.getCoordId() != null) {
088                LOG.info("Updated bundle action [{0}] from prev status [{1}] to current coord status [{2}], and new bundle action pending [{3}]", bundleaction
089                    .getBundleActionId(), bundleaction.getStatus(), coordCurrentStatus, bundleaction.getPending());
090            }
091            else {
092                LOG.info("Updated Bundle action [{0}], status = [{1}], pending = [{2}]", bundleaction.getBundleActionId(),
093                        bundleaction.getStatus(), bundleaction.getPending());
094            }
095            LOG.debug("ENDED BundleStatusUpdateXCommand with bundle id : " + coordjob.getBundleId() + " coord job ID: "
096                    + coordjob.getId() + " coord Status " + coordjob.getStatus());
097        }
098        catch (Exception ex) {
099            throw new CommandException(ErrorCode.E1309, bundleaction.getBundleId(), bundleaction.getCoordName());
100        }
101        return null;
102    }
103
104    /* (non-Javadoc)
105     * @see org.apache.oozie.command.XCommand#getEntityKey()
106     */
107    @Override
108    public String getEntityKey() {
109        return bundleaction.getBundleId();
110    }
111
112    /* (non-Javadoc)
113     * @see org.apache.oozie.command.XCommand#isLockRequired()
114     */
115    @Override
116    protected boolean isLockRequired() {
117        return true;
118    }
119
120    /* (non-Javadoc)
121     * @see org.apache.oozie.command.XCommand#eagerLoadState()
122     */
123    @Override
124    protected void eagerLoadState() throws CommandException{
125        loadState();
126    }
127
128    /* (non-Javadoc)
129     * @see org.apache.oozie.command.XCommand#loadState()
130     */
131    @Override
132    protected void loadState() throws CommandException {
133        try {
134            if (jpaService == null) {
135                jpaService = Services.get().get(JPAService.class);
136            }
137
138            if (jpaService != null) {
139                this.bundleaction = jpaService.execute(new BundleActionGetJPAExecutor(coordjob.getBundleId(), coordjob
140                        .getAppName()));
141            }
142            else {
143                throw new CommandException(ErrorCode.E0610);
144            }
145        }
146        catch (XException ex) {
147            throw new CommandException(ex);
148        }
149    }
150
151    /* (non-Javadoc)
152     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
153     */
154    @Override
155    protected void verifyPrecondition() throws CommandException, PreconditionException {
156        if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) {
157            // pending should be decremented only if status of coord job and bundle action is same
158            // e.g if bundle is killed and coord job is running, then pending should not be false
159            // to allow recovery service to pick and kill the coord job
160            if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus())) {
161                bundleaction.decrementAndGetPending();
162            }
163            bundleaction.setLastModifiedTime(new Date());
164            try {
165                jpaService.execute(new BundleActionUpdateJPAExecutor(bundleaction));
166            }
167            catch (JPAExecutorException je) {
168                throw new CommandException(je);
169            }
170            LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], decrement pending so new pending = [{3}]",
171                            bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
172                            bundleaction.getPending());
173            throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString());
174        }
175        else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) {
176            LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], pending = [{3}] and bundle not yet updated with coord-id",
177                    bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(),
178                    bundleaction.getPending());
179        }
180    }
181
182}