001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.oozie.client.WorkflowJob;
023import org.apache.oozie.client.SLAEvent.SlaAppType;
024import org.apache.oozie.client.SLAEvent.Status;
025import org.apache.oozie.client.rest.JsonBean;
026import org.apache.oozie.SLAEventBean;
027import org.apache.oozie.WorkflowActionBean;
028import org.apache.oozie.WorkflowJobBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.XException;
031import org.apache.oozie.command.CommandException;
032import org.apache.oozie.command.PreconditionException;
033import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
034import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
035import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
036import org.apache.oozie.executor.jpa.JPAExecutorException;
037import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
038import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
039import org.apache.oozie.service.ELService;
040import org.apache.oozie.service.EventHandlerService;
041import org.apache.oozie.service.JPAService;
042import org.apache.oozie.service.Services;
043import org.apache.oozie.service.UUIDService;
044import org.apache.oozie.service.WorkflowStoreService;
045import org.apache.oozie.workflow.WorkflowException;
046import org.apache.oozie.workflow.WorkflowInstance;
047import org.apache.oozie.workflow.lite.KillNodeDef;
048import org.apache.oozie.workflow.lite.NodeDef;
049import org.apache.oozie.util.ELEvaluator;
050import org.apache.oozie.util.InstrumentUtils;
051import org.apache.oozie.util.LogUtils;
052import org.apache.oozie.util.XConfiguration;
053import org.apache.oozie.util.ParamChecker;
054import org.apache.oozie.util.XmlUtils;
055import org.apache.oozie.util.db.SLADbXOperations;
056import org.jdom.Element;
057import java.io.StringReader;
058import java.util.ArrayList;
059import java.util.Date;
060import java.util.List;
061import java.util.Map;
062import org.apache.oozie.client.OozieClient;
063
064@SuppressWarnings("deprecation")
065public class SignalXCommand extends WorkflowXCommand<Void> {
066
067    protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
068
069    private JPAService jpaService = null;
070    private String jobId;
071    private String actionId;
072    private WorkflowJobBean wfJob;
073    private WorkflowActionBean wfAction;
074    private List<JsonBean> updateList = new ArrayList<JsonBean>();
075    private List<JsonBean> insertList = new ArrayList<JsonBean>();
076    private boolean generateEvent = false;
077    private String wfJobErrorCode;
078    private String wfJobErrorMsg;
079
080
081    public SignalXCommand(String name, int priority, String jobId) {
082        super(name, name, priority);
083        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
084    }
085
086    public SignalXCommand(String jobId, String actionId) {
087        this("signal", 1, jobId);
088        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
089    }
090
091    @Override
092    protected boolean isLockRequired() {
093        return true;
094    }
095
096    @Override
097    public String getEntityKey() {
098        return this.jobId;
099    }
100
101    @Override
102    public String getKey() {
103        return getName() + "_" + jobId + "_" + actionId;
104    }
105
106    @Override
107    protected void loadState() throws CommandException {
108        try {
109            jpaService = Services.get().get(JPAService.class);
110            if (jpaService != null) {
111                this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
112                LogUtils.setLogInfo(wfJob, logInfo);
113                if (actionId != null) {
114                    this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
115                    LogUtils.setLogInfo(wfAction, logInfo);
116                }
117            }
118            else {
119                throw new CommandException(ErrorCode.E0610);
120            }
121        }
122        catch (XException ex) {
123            throw new CommandException(ex);
124        }
125    }
126
127    @Override
128    protected void verifyPrecondition() throws CommandException, PreconditionException {
129        if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) {
130            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) {
131                throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr());
132            }
133        }
134        else {
135            throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending());
136        }
137    }
138
139    @Override
140    protected Void execute() throws CommandException {
141        LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
142        WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
143        workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
144        boolean completed = false;
145        boolean skipAction = false;
146        if (wfAction == null) {
147            if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
148                try {
149                    completed = workflowInstance.start();
150                }
151                catch (WorkflowException e) {
152                    throw new CommandException(e);
153                }
154                wfJob.setStatus(WorkflowJob.Status.RUNNING);
155                wfJob.setStartTime(new Date());
156                wfJob.setWorkflowInstance(workflowInstance);
157                generateEvent = true;
158                // 1. Add SLA status event for WF-JOB with status STARTED
159                SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
160                        Status.STARTED, SlaAppType.WORKFLOW_JOB);
161                if(slaEvent != null) {
162                    insertList.add(slaEvent);
163                }
164                // 2. Add SLA registration events for all WF_ACTIONS
165                createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
166                        .getGroup(), wfJob.getConf());
167                queue(new NotificationXCommand(wfJob));
168            }
169            else {
170                throw new CommandException(ErrorCode.E0801, wfJob.getId());
171            }
172        }
173        else {
174            WorkflowInstance.Status initialStatus = workflowInstance.getStatus();
175            String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
176                    + ReRunXCommand.TO_SKIP);
177            if (skipVar != null) {
178                skipAction = skipVar.equals("true");
179            }
180            try {
181                completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue());
182            }
183            catch (WorkflowException e) {
184                throw new CommandException(e);
185            }
186            wfJob.setWorkflowInstance(workflowInstance);
187            wfAction.resetPending();
188            if (!skipAction) {
189                wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
190                queue(new NotificationXCommand(wfJob, wfAction));
191            }
192            updateList.add(wfAction);
193            WorkflowInstance.Status endStatus = workflowInstance.getStatus();
194            if (endStatus != initialStatus) {
195                generateEvent = true;
196            }
197        }
198
199        if (completed) {
200            try {
201                for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
202                    WorkflowActionBean actionToKill;
203
204                    actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId));
205
206                    actionToKill.setPending();
207                    actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
208                    updateList.add(actionToKill);
209                    queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
210                }
211
212                for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
213                    WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor(
214                            actionToFailId));
215                    actionToFail.resetPending();
216                    actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
217                    if (wfJobErrorCode != null) {
218                        wfJobErrorCode = actionToFail.getErrorCode();
219                        wfJobErrorMsg = actionToFail.getErrorMessage();
220                    }
221                    queue(new NotificationXCommand(wfJob, actionToFail));
222                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
223                            Status.FAILED, SlaAppType.WORKFLOW_ACTION);
224                    if(slaEvent != null) {
225                        insertList.add(slaEvent);
226                    }
227                    updateList.add(actionToFail);
228                }
229            }
230            catch (JPAExecutorException je) {
231                throw new CommandException(je);
232            }
233
234            wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
235            wfJob.setEndTime(new Date());
236            wfJob.setWorkflowInstance(workflowInstance);
237            Status slaStatus = Status.SUCCEEDED;
238            switch (wfJob.getStatus()) {
239                case SUCCEEDED:
240                    slaStatus = Status.SUCCEEDED;
241                    break;
242                case KILLED:
243                    slaStatus = Status.KILLED;
244                    break;
245                case FAILED:
246                    slaStatus = Status.FAILED;
247                    break;
248                default: // TODO SUSPENDED
249                    break;
250            }
251            SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
252                    slaStatus, SlaAppType.WORKFLOW_JOB);
253            if(slaEvent != null) {
254                insertList.add(slaEvent);
255            }
256            queue(new NotificationXCommand(wfJob));
257            if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
258                InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
259            }
260
261            // output message for Kill node
262            if (wfAction != null) { // wfAction could be a no-op job
263                NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
264                if (nodeDef != null && nodeDef instanceof KillNodeDef) {
265                    boolean isRetry = false;
266                    boolean isUserRetry = false;
267                    ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
268                            isUserRetry);
269                    try {
270                        String tmpNodeConf = nodeDef.getConf();
271                        String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
272                        LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
273                                        jobId, actionId, tmpNodeConf, actionConf);
274                        if (wfAction.getErrorCode() != null) {
275                            wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
276                        }
277                        else {
278                            wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
279                        }
280                        updateList.add(wfAction);
281                    }
282                    catch (Exception ex) {
283                        LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
284                        throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
285                    }
286                }
287            }
288
289        }
290        else {
291            for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
292                String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
293                        + ReRunXCommand.TO_SKIP);
294                boolean skipNewAction = false;
295                if (skipVar != null) {
296                    skipNewAction = skipVar.equals("true");
297                }
298                try {
299                    if (skipNewAction) {
300                        WorkflowActionBean oldAction;
301
302                        oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId()));
303
304                        oldAction.setPending();
305                        updateList.add(oldAction);
306
307                        queue(new SignalXCommand(jobId, oldAction.getId()));
308                    }
309                    else {
310                        checkForSuspendNode(newAction);
311                        newAction.setPending();
312                        String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
313                                .getDefinition(), wfJob.getConf());
314                        newAction.setSlaXml(actionSlaXml);
315                        insertList.add(newAction);
316                        LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred());
317                        queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
318                    }
319                }
320                catch (JPAExecutorException je) {
321                    throw new CommandException(je);
322                }
323            }
324        }
325
326        try {
327            wfJob.setLastModifiedTime(new Date());
328            updateList.add(wfJob);
329            // call JPAExecutor to do the bulk writes
330            jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
331            if (generateEvent && EventHandlerService.isEnabled()) {
332                generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
333            }
334        }
335        catch (JPAExecutorException je) {
336            throw new CommandException(je);
337        }
338        LOG.debug(
339                "Updated the workflow status to " + wfJob.getId() + "  status =" + wfJob.getStatusStr());
340        if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
341            // update coordinator action
342            new CoordActionUpdateXCommand(wfJob).call();    //Note: Called even if wf is not necessarily instantiated by coordinator
343            new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
344        }
345        LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
346        return null;
347    }
348
349    public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
350        ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
351        for (Map.Entry<String, String> entry : conf) {
352            eval.setVariable(entry.getKey(), entry.getValue());
353        }
354        return eval;
355    }
356
357    @SuppressWarnings("unchecked")
358    private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
359        String slaXml = null;
360        try {
361            Element eWfJob = XmlUtils.parseXml(wfXml);
362            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
363                if (action.getAttributeValue("name").equals(actionName) == false) {
364                    continue;
365                }
366                Element eSla = XmlUtils.getSLAElement(action);
367                if (eSla != null) {
368                    slaXml = XmlUtils.prettyPrint(eSla).toString();
369                    break;
370                }
371            }
372        }
373        catch (Exception e) {
374            throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
375        }
376        return slaXml;
377    }
378
379    private String resolveSla(Element eSla, Configuration conf) throws CommandException {
380        String slaXml = null;
381        try {
382            ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
383            slaXml = SubmitXCommand.resolveSla(eSla, evalSla);
384        }
385        catch (Exception e) {
386            throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
387        }
388        return slaXml;
389    }
390
391    @SuppressWarnings("unchecked")
392    private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
393            throws CommandException {
394        try {
395            Element eWfJob = XmlUtils.parseXml(wfXml);
396            Configuration conf = new XConfiguration(new StringReader(strConf));
397            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
398                Element eSla = XmlUtils.getSLAElement(action);
399                if (eSla != null) {
400                    String slaXml = resolveSla(eSla, conf);
401                    eSla = XmlUtils.parseXml(slaXml);
402                    String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
403                            action.getAttributeValue("name") + "");
404                    SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId,
405                            SlaAppType.WORKFLOW_ACTION, user, group);
406                    if(slaEvent != null) {
407                        insertList.add(slaEvent);
408                    }
409                }
410            }
411        }
412        catch (Exception e) {
413            throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e);
414        }
415
416    }
417
418    private void checkForSuspendNode(WorkflowActionBean newAction) {
419        try {
420            XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf()));
421            String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES);
422            if (values != null) {
423                if (values.length == 1 && values[0].equals("*")) {
424                    LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), wfJob.getId());
425                    queue(new SuspendXCommand(jobId));
426                }
427                else {
428                    for (String suspendPoint : values) {
429                        if (suspendPoint.equals(newAction.getName())) {
430                            LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(),
431                                    wfJob.getId());
432                            queue(new SuspendXCommand(jobId));
433                            break;
434                        }
435                    }
436                }
437            }
438        }
439        catch (IOException ex) {
440            LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage());
441        }
442    }
443
444}