001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.io.IOException;
021import java.net.URISyntaxException;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.WorkflowActionBean;
028import org.apache.oozie.WorkflowJobBean;
029import org.apache.oozie.action.control.EndActionExecutor;
030import org.apache.oozie.action.control.ForkActionExecutor;
031import org.apache.oozie.action.control.JoinActionExecutor;
032import org.apache.oozie.action.control.KillActionExecutor;
033import org.apache.oozie.action.control.StartActionExecutor;
034import org.apache.oozie.client.WorkflowJob;
035import org.apache.oozie.client.rest.JsonBean;
036import org.apache.oozie.command.CommandException;
037import org.apache.oozie.command.PreconditionException;
038import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
039import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
040import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
041import org.apache.oozie.executor.jpa.JPAExecutorException;
042import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
043import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
044import org.apache.oozie.service.EventHandlerService;
045import org.apache.oozie.service.HadoopAccessorException;
046import org.apache.oozie.service.JPAService;
047import org.apache.oozie.service.Services;
048import org.apache.oozie.util.InstrumentUtils;
049import org.apache.oozie.util.LogUtils;
050import org.apache.oozie.util.ParamChecker;
051import org.apache.oozie.workflow.WorkflowException;
052import org.apache.oozie.workflow.WorkflowInstance;
053import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
054
055public class ResumeXCommand extends WorkflowXCommand<Void> {
056
057    private String id;
058    private JPAService jpaService = null;
059    private WorkflowJobBean workflow = null;
060    private List<JsonBean> updateList = new ArrayList<JsonBean>();
061
062    public ResumeXCommand(String id) {
063        super("resume", "resume", 1);
064        this.id = ParamChecker.notEmpty(id, "id");
065    }
066
067    @Override
068    protected Void execute() throws CommandException {
069        try {
070            if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
071                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
072                workflow.getWorkflowInstance().resume();
073                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
074                ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.RUNNING);
075                workflow.setWorkflowInstance(wfInstance);
076                workflow.setStatus(WorkflowJob.Status.RUNNING);
077
078                //for (WorkflowActionBean action : store.getActionsForWorkflow(id, false)) {
079                for (WorkflowActionBean action : jpaService.execute(new WorkflowJobGetActionsJPAExecutor(id))) {
080
081                    // Set pending flag to true for the actions that are START_RETRY or
082                    // START_MANUAL or END_RETRY or END_MANUAL
083                    if (action.isRetryOrManual()) {
084                        action.setPendingOnly();
085                        updateList.add(action);
086                    }
087
088                    if (action.isPending()) {
089                        if (action.getStatus() == WorkflowActionBean.Status.PREP
090                                || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
091                            // When resuming a workflow that was programatically suspended (via ActionCheckXCommand) because of
092                            // a repeated transient error, we have to clean up the action dir
093                            if (!action.getType().equals(StartActionExecutor.TYPE) &&       // The control actions have invalid
094                                !action.getType().equals(ForkActionExecutor.TYPE) &&        // action dir paths because they
095                                !action.getType().equals(JoinActionExecutor.TYPE) &&        // contain ":" (colons)
096                                !action.getType().equals(KillActionExecutor.TYPE) &&
097                                !action.getType().equals(EndActionExecutor.TYPE)) {
098                                ActionExecutorContext context =
099                                        new ActionXCommand.ActionExecutorContext(workflow, action, false, false);
100                                if (context.getAppFileSystem().exists(context.getActionDir())) {
101                                    context.getAppFileSystem().delete(context.getActionDir(), true);
102                                }
103                            }
104                            queue(new ActionStartXCommand(action.getId(), action.getType()));
105                        }
106                        else {
107                            if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
108                                Date nextRunTime = action.getPendingAge();
109                                queue(new ActionStartXCommand(action.getId(), action.getType()),
110                                              nextRunTime.getTime() - System.currentTimeMillis());
111                            }
112                            else {
113                                if (action.getStatus() == WorkflowActionBean.Status.DONE
114                                        || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
115                                    queue(new ActionEndXCommand(action.getId(), action.getType()));
116                                }
117                                else {
118                                    if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
119                                        Date nextRunTime = action.getPendingAge();
120                                        queue(new ActionEndXCommand(action.getId(), action.getType()),
121                                                      nextRunTime.getTime() - System.currentTimeMillis());
122                                    }
123                                }
124                            }
125                        }
126
127                    }
128                }
129
130                workflow.setLastModifiedTime(new Date());
131                updateList.add(workflow);
132                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
133                if (EventHandlerService.isEnabled()) {
134                    generateEvent(workflow);
135                }
136                queue(new NotificationXCommand(workflow));
137            }
138            return null;
139        }
140        catch (WorkflowException ex) {
141            throw new CommandException(ex);
142        }
143        catch (JPAExecutorException e) {
144            throw new CommandException(e);
145        }
146        catch (HadoopAccessorException e) {
147            throw new CommandException(e);
148        }
149        catch (IOException e) {
150            throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
151        }
152        catch (URISyntaxException e) {
153            throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
154        }
155        finally {
156            // update coordinator action
157            new CoordActionUpdateXCommand(workflow).call();
158        }
159    }
160
161    @Override
162    public String getEntityKey() {
163        return id;
164    }
165
166    @Override
167    public String getKey() {
168        return getName() + "_" + id;
169    }
170
171    @Override
172    protected boolean isLockRequired() {
173        return true;
174    }
175
176    @Override
177    protected void loadState() throws CommandException {
178        jpaService = Services.get().get(JPAService.class);
179        if (jpaService == null) {
180            throw new CommandException(ErrorCode.E0610);
181        }
182        try {
183            workflow = jpaService.execute(new WorkflowJobGetJPAExecutor(id));
184        }
185        catch (JPAExecutorException e) {
186            throw new CommandException(e);
187        }
188        LogUtils.setLogInfo(workflow, logInfo);
189    }
190
191    @Override
192    protected void verifyPrecondition() throws CommandException, PreconditionException {
193        if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED) {
194            throw new PreconditionException(ErrorCode.E1100, "workflow's status is " + workflow.getStatusStr() + " is not SUSPENDED");
195        }
196    }
197}