001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023
024import org.apache.oozie.CoordinatorActionBean;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.SLAEventBean;
028import org.apache.oozie.WorkflowJobBean;
029import org.apache.oozie.XException;
030import org.apache.oozie.service.EventHandlerService;
031import org.apache.oozie.service.JPAService;
032import org.apache.oozie.service.Services;
033import org.apache.oozie.util.LogUtils;
034import org.apache.oozie.util.db.SLADbOperations;
035import org.apache.oozie.client.CoordinatorAction;
036import org.apache.oozie.client.WorkflowJob;
037import org.apache.oozie.client.SLAEvent.SlaAppType;
038import org.apache.oozie.client.SLAEvent.Status;
039import org.apache.oozie.client.rest.JsonBean;
040import org.apache.oozie.command.CommandException;
041import org.apache.oozie.command.PreconditionException;
042import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
043import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
044import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
045import org.apache.oozie.executor.jpa.JPAExecutorException;
046
047@SuppressWarnings("deprecation")
048public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
049    private WorkflowJobBean workflow;
050    private CoordinatorActionBean coordAction = null;
051    private CoordinatorJobBean coordJob;
052    private JPAService jpaService = null;
053    private int maxRetries = 1;
054    private List<JsonBean> updateList = new ArrayList<JsonBean>();
055    private List<JsonBean> insertList = new ArrayList<JsonBean>();
056
057    public CoordActionUpdateXCommand(WorkflowJobBean workflow) {
058        super("coord-action-update", "coord-action-update", 1);
059        this.workflow = workflow;
060    }
061
062    public CoordActionUpdateXCommand(WorkflowJobBean workflow, int maxRetries) {
063        super("coord-action-update", "coord-action-update", 1);
064        this.workflow = workflow;
065        this.maxRetries = maxRetries;
066    }
067
068    @Override
069    protected Void execute() throws CommandException {
070        try {
071            LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
072            Status slaStatus = null;
073            if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
074                coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
075                coordAction.setPending(0);
076                slaStatus = Status.SUCCEEDED;
077            }
078            else if (workflow.getStatus() == WorkflowJob.Status.FAILED) {
079                coordAction.setStatus(CoordinatorAction.Status.FAILED);
080                coordAction.setPending(0);
081                slaStatus = Status.FAILED;
082            }
083            else if (workflow.getStatus() == WorkflowJob.Status.KILLED) {
084                coordAction.setStatus(CoordinatorAction.Status.KILLED);
085                coordAction.setPending(0);
086                slaStatus = Status.KILLED;
087            }
088            else if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
089                coordAction.setStatus(CoordinatorAction.Status.SUSPENDED);
090                coordAction.decrementAndGetPending();
091            }
092            else if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
093                // resume workflow job and update coord action accordingly
094                coordAction.setStatus(CoordinatorAction.Status.RUNNING);
095                coordAction.decrementAndGetPending();
096            }
097            else {
098                LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
099                // update lastModifiedTime
100                coordAction.setLastModifiedTime(new Date());
101                updateList.add(coordAction);
102                jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
103                // TODO - Uncomment this when bottom up rerun can change terminal state
104                /* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
105                if (!coordJob.isPending()) {
106                    coordJob.setPending();
107                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
108                }*/
109                return null;
110            }
111
112            LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status "
113                    + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
114
115            coordAction.setLastModifiedTime(new Date());
116            updateList.add(coordAction);
117            // TODO - Uncomment this when bottom up rerun can change terminal state
118            /*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
119            if (!coordJob.isPending()) {
120                coordJob.setPending();
121                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
122                LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true");
123            }*/
124            if (slaStatus != null) {
125                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
126                        SlaAppType.COORDINATOR_ACTION, LOG);
127                if(slaEvent != null) {
128                    insertList.add(slaEvent);
129                }
130            }
131            if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED
132                    && workflow.getStatus() != WorkflowJob.Status.RUNNING) {
133                queue(new CoordActionReadyXCommand(coordAction.getJobId()));
134            }
135
136            jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
137            if (EventHandlerService.isEnabled()) {
138                generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime());
139            }
140
141            LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
142        }
143        catch (XException ex) {
144            LOG.warn("CoordActionUpdate Failed ", ex.getMessage());
145            throw new CommandException(ex);
146        }
147        return null;
148    }
149
150    /* (non-Javadoc)
151     * @see org.apache.oozie.command.XCommand#getEntityKey()
152     */
153    @Override
154    public String getEntityKey() {
155        return coordAction.getJobId();
156    }
157
158    /* (non-Javadoc)
159     * @see org.apache.oozie.command.XCommand#isLockRequired()
160     */
161    @Override
162    protected boolean isLockRequired() {
163        return true;
164    }
165
166    /* (non-Javadoc)
167     * @see org.apache.oozie.command.XCommand#eagerLoadState()
168     */
169    @Override
170    protected void eagerLoadState() throws CommandException {
171        jpaService = Services.get().get(JPAService.class);
172        if (jpaService == null) {
173            throw new CommandException(ErrorCode.E0610);
174        }
175
176        int retries = 0;
177        while (retries++ < maxRetries) {
178            try {
179                coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(workflow.getId()));
180                if (coordAction != null) {
181                    coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
182                            coordAction.getJobId()));
183                    break;
184                }
185
186                if (retries < maxRetries) {
187                    Thread.sleep(500);
188                }
189            }
190            catch (JPAExecutorException je) {
191                LOG.warn("Could not load coord action {0}", je.getMessage(), je);
192            }
193            catch (InterruptedException ex) {
194                LOG.warn("Retry to load coord action is interrupted {0}", ex.getMessage(), ex);
195            }
196        }
197
198        if (coordAction != null) {
199            LogUtils.setLogInfo(coordAction, logInfo);
200        }
201    }
202
203    /* (non-Javadoc)
204     * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
205     */
206    @Override
207    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
208        if (coordAction == null) {
209            throw new PreconditionException(ErrorCode.E1100, ", coord action is null");
210        }
211    }
212
213    /* (non-Javadoc)
214     * @see org.apache.oozie.command.XCommand#loadState()
215     */
216    @Override
217    protected void loadState() throws CommandException {
218        eagerLoadState();
219    }
220
221    /* (non-Javadoc)
222     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
223     */
224    @Override
225    protected void verifyPrecondition() throws CommandException, PreconditionException {
226
227        // if coord action is RUNNING and pending false and workflow is RUNNING, this doesn't need to be updated.
228        if (workflow.getStatus() == WorkflowJob.Status.RUNNING
229                && coordAction.getStatus() == CoordinatorAction.Status.RUNNING && !coordAction.isPending()) {
230            // update lastModifiedTime
231            coordAction.setLastModifiedTime(new Date());
232            try {
233                jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(coordAction));
234            }
235            catch (JPAExecutorException je) {
236                throw new CommandException(je);
237            }
238            throw new PreconditionException(ErrorCode.E1100, ", workflow is RUNNING and coordinator action is RUNNING and pending false");
239        }
240    }
241
242}