001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.workflow.lite;
019
020import org.apache.oozie.service.XLogService;
021import org.apache.oozie.service.DagXLogInfoService;
022import org.apache.oozie.client.OozieClient;
023import org.apache.hadoop.io.Writable;
024import org.apache.hadoop.util.ReflectionUtils;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.workflow.WorkflowApp;
027import org.apache.oozie.workflow.WorkflowException;
028import org.apache.oozie.workflow.WorkflowInstance;
029import org.apache.oozie.util.ParamChecker;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.util.XConfiguration;
032import org.apache.oozie.ErrorCode;
033
034import java.io.DataInput;
035import java.io.DataOutput;
036import java.io.IOException;
037import java.io.ByteArrayOutputStream;
038import java.io.ByteArrayInputStream;
039import java.util.ArrayList;
040import java.util.HashMap;
041import java.util.List;
042import java.util.Map;
043
044//TODO javadoc
045public class LiteWorkflowInstance implements Writable, WorkflowInstance {
046    private static final String TRANSITION_TO = "transition.to";
047
048    private XLog log;
049
050    private static String PATH_SEPARATOR = "/";
051    private static String ROOT = PATH_SEPARATOR;
052    private static String TRANSITION_SEPARATOR = "#";
053
054    private static class NodeInstance {
055        String nodeName;
056        boolean started = false;
057
058        private NodeInstance(String nodeName) {
059            this.nodeName = nodeName;
060        }
061    }
062
063    private class Context implements NodeHandler.Context {
064        private NodeDef nodeDef;
065        private String executionPath;
066        private String exitState;
067        private Status status = Status.RUNNING;
068
069        private Context(NodeDef nodeDef, String executionPath, String exitState) {
070            this.nodeDef = nodeDef;
071            this.executionPath = executionPath;
072            this.exitState = exitState;
073        }
074
075        public NodeDef getNodeDef() {
076            return nodeDef;
077        }
078
079        public String getExecutionPath() {
080            return executionPath;
081        }
082
083        public String getParentExecutionPath(String executionPath) {
084            return LiteWorkflowInstance.getParentPath(executionPath);
085        }
086
087        public String getSignalValue() {
088            return exitState;
089        }
090
091        public String createExecutionPath(String name) {
092            return LiteWorkflowInstance.createChildPath(executionPath, name);
093        }
094
095        public String createFullTransition(String executionPath, String transition) {
096            return LiteWorkflowInstance.createFullTransition(executionPath, transition);
097        }
098
099        public void deleteExecutionPath() {
100            if (!executionPaths.containsKey(executionPath)) {
101                throw new IllegalStateException();
102            }
103            executionPaths.remove(executionPath);
104            executionPath = LiteWorkflowInstance.getParentPath(executionPath);
105        }
106
107        public void failJob() {
108            status = Status.FAILED;
109        }
110
111        public void killJob() {
112            status = Status.KILLED;
113        }
114
115        public void completeJob() {
116            status = Status.SUCCEEDED;
117        }
118
119        @Override
120        public Object getTransientVar(String name) {
121            return LiteWorkflowInstance.this.getTransientVar(name);
122        }
123
124        @Override
125        public String getVar(String name) {
126            return LiteWorkflowInstance.this.getVar(name);
127        }
128
129        @Override
130        public void setTransientVar(String name, Object value) {
131            LiteWorkflowInstance.this.setTransientVar(name, value);
132        }
133
134        @Override
135        public void setVar(String name, String value) {
136            LiteWorkflowInstance.this.setVar(name, value);
137        }
138
139        @Override
140        public LiteWorkflowInstance getProcessInstance() {
141            return LiteWorkflowInstance.this;
142        }
143
144    }
145
146    private LiteWorkflowApp def;
147    private Configuration conf;
148    private String instanceId;
149    private Status status;
150    private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
151    private Map<String, String> persistentVars = new HashMap<String, String>();
152    private Map<String, Object> transientVars = new HashMap<String, Object>();
153
154    protected LiteWorkflowInstance() {
155        log = XLog.getLog(getClass());
156    }
157
158    public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) {
159        this();
160        this.def = ParamChecker.notNull(def, "def");
161        this.instanceId = ParamChecker.notNull(instanceId, "instanceId");
162        this.conf = ParamChecker.notNull(conf, "conf");
163        refreshLog();
164        status = Status.PREP;
165    }
166
167    public synchronized boolean start() throws WorkflowException {
168        if (status != Status.PREP) {
169            throw new WorkflowException(ErrorCode.E0719);
170        }
171        log.debug(XLog.STD, "Starting job");
172        status = Status.RUNNING;
173        executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START));
174        return signal(ROOT, StartNodeDef.START);
175    }
176
177    //todo if suspended store signal and use when resuming
178
179    public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
180        ParamChecker.notEmpty(executionPath, "executionPath");
181        ParamChecker.notNull(signalValue, "signalValue");
182        log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue);
183        if (status != Status.RUNNING) {
184            throw new WorkflowException(ErrorCode.E0716);
185        }
186        NodeInstance nodeJob = executionPaths.get(executionPath);
187        if (nodeJob == null) {
188            status = Status.FAILED;
189            log.error("invalid execution path [{0}]", executionPath);
190        }
191        NodeDef nodeDef = null;
192        if (!status.isEndState()) {
193            nodeDef = def.getNode(nodeJob.nodeName);
194            if (nodeDef == null) {
195                status = Status.FAILED;
196                log.error("invalid transition [{0}]", nodeJob.nodeName);
197            }
198        }
199        if (!status.isEndState()) {
200            NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
201            boolean exiting = true;
202
203            Context context = new Context(nodeDef, executionPath, signalValue);
204            if (!nodeJob.started) {
205                try {
206                    nodeHandler.loopDetection(context);
207                    exiting = nodeHandler.enter(context);
208                    nodeJob.started = true;
209                }
210                catch (WorkflowException ex) {
211                    status = Status.FAILED;
212                    throw ex;
213                }
214            }
215
216            if (exiting) {
217                List<String> pathsToStart = new ArrayList<String>();
218                List<String> fullTransitions;
219                try {
220                    fullTransitions = nodeHandler.multiExit(context);
221                    int last = fullTransitions.size() - 1;
222                    // TEST THIS
223                    if (last >= 0) {
224                        String transitionTo = getTransitionNode(fullTransitions.get(last));
225                        if (nodeDef instanceof ForkNodeDef) {
226                            transitionTo = "*"; // WF action cannot hold all transitions for a fork.
227                                                // transitions are hardcoded in the WF app.
228                        }
229                        persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
230                                           transitionTo);
231                    }
232                }
233                catch (WorkflowException ex) {
234                    status = Status.FAILED;
235                    throw ex;
236                }
237
238                if (context.status == Status.KILLED) {
239                    status = Status.KILLED;
240                    log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
241                }
242                else {
243                    if (context.status == Status.FAILED) {
244                        status = Status.FAILED;
245                        log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
246                    }
247                    else {
248                        if (context.status == Status.SUCCEEDED) {
249                            status = Status.SUCCEEDED;
250                            log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
251                        }
252/*
253                else if (context.status == Status.SUSPENDED) {
254                    status = Status.SUSPENDED;
255                    log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
256                }
257*/
258                        else {
259                            for (String fullTransition : fullTransitions) {
260                                // this is the whole trick for forking, we need the
261                                // executionpath and the transition
262                                // in the case of no forking last element of
263                                // executionpath is different from transition
264                                // in the case of forking they are the same
265
266                                log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
267                                          fullTransition);
268
269                                String execPathFromTransition = getExecutionPath(fullTransition);
270                                String transition = getTransitionNode(fullTransition);
271                                def.validateTransition(nodeJob.nodeName, transition);
272
273                                NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
274                                if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
275                                    // TODO explain this IF better
276                                    // If the WfJob is signaled with the parent
277                                    // execution executionPath again
278                                    // The Fork node will execute again.. and replace
279                                    // the Node WorkflowJobBean
280                                    // so this is required to prevent that..
281                                    // Question : Should we throw an error in this case
282                                    // ??
283                                    executionPaths.put(execPathFromTransition, new NodeInstance(transition));
284                                    pathsToStart.add(execPathFromTransition);
285                                }
286
287                            }
288                            // signal all new synch transitions
289                            for (String pathToStart : pathsToStart) {
290                                signal(pathToStart, "::synch::");
291                            }
292                        }
293                    }
294                }
295            }
296        }
297        if (status.isEndState()) {
298            if (status == Status.FAILED) {
299                List<String> failedNodes = terminateNodes(status);
300                log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
301                        .size());
302            }
303            else {
304                List<String> killedNodes = terminateNodes(Status.KILLED);
305
306                if (killedNodes.size() > 1) {
307                    log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
308                            .size());
309                }
310            }
311        }
312        return status.isEndState();
313    }
314
315    /**
316     * Get NodeDef from workflow instance
317     * @param executionPath execution path
318     * @return node def
319     */
320    public NodeDef getNodeDef(String executionPath) {
321        NodeInstance nodeJob = executionPaths.get(executionPath);
322        NodeDef nodeDef = null;
323        if (nodeJob == null) {
324            log.error("invalid execution path [{0}]", executionPath);
325        }
326        else {
327            nodeDef = def.getNode(nodeJob.nodeName);
328            if (nodeDef == null) {
329                log.error("invalid transition [{0}]", nodeJob.nodeName);
330            }
331        }
332        return nodeDef;
333    }
334
335    public synchronized void fail(String nodeName) throws WorkflowException {
336        if (status.isEndState()) {
337            throw new WorkflowException(ErrorCode.E0718);
338        }
339        String failedNode = failNode(nodeName);
340        if (failedNode != null) {
341            log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode);
342        }
343        else {
344            //TODO failed attempting to fail the action. EXCEPTION
345        }
346        List<String> killedNodes = killNodes();
347        if (killedNodes.size() > 1) {
348            log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size());
349        }
350        status = Status.FAILED;
351    }
352
353    public synchronized void kill() throws WorkflowException {
354        if (status.isEndState()) {
355            throw new WorkflowException(ErrorCode.E0718);
356        }
357        log.debug(XLog.STD, "Killing job");
358        List<String> killedNodes = killNodes();
359        if (killedNodes.size() > 1) {
360            log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size());
361        }
362        status = Status.KILLED;
363    }
364
365    public synchronized void suspend() throws WorkflowException {
366        if (status != Status.RUNNING) {
367            throw new WorkflowException(ErrorCode.E0716);
368        }
369        log.debug(XLog.STD, "Suspending job");
370        this.status = Status.SUSPENDED;
371    }
372
373    public boolean isSuspended() {
374        return (status == Status.SUSPENDED);
375    }
376
377    public synchronized void resume() throws WorkflowException {
378        if (status != Status.SUSPENDED) {
379            throw new WorkflowException(ErrorCode.E0717);
380        }
381        log.debug(XLog.STD, "Resuming job");
382        status = Status.RUNNING;
383    }
384
385    public void setVar(String name, String value) {
386        if (value != null) {
387            persistentVars.put(name, value);
388        }
389        else {
390            persistentVars.remove(name);
391        }
392    }
393
394    @Override
395    public Map<String, String> getAllVars() {
396        return persistentVars;
397    }
398
399    @Override
400    public void setAllVars(Map<String, String> varMap) {
401        persistentVars.putAll(varMap);
402    }
403
404    public String getVar(String name) {
405        return persistentVars.get(name);
406    }
407
408
409    public void setTransientVar(String name, Object value) {
410        if (value != null) {
411            transientVars.put(name, value);
412        }
413        else {
414            transientVars.remove(name);
415        }
416    }
417
418    public boolean hasTransientVar(String name) {
419        return transientVars.containsKey(name);
420    }
421
422    public Object getTransientVar(String name) {
423        return transientVars.get(name);
424    }
425
426    public boolean hasEnded() {
427        return status.isEndState();
428    }
429
430    private List<String> terminateNodes(Status endStatus) {
431        List<String> endNodes = new ArrayList<String>();
432        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
433            if (entry.getValue().started) {
434                NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
435                if (!(nodeDef instanceof ControlNodeDef)) {
436                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
437                    try {
438                        if (endStatus == Status.KILLED) {
439                            nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
440                        }
441                        else {
442                            if (endStatus == Status.FAILED) {
443                                nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
444                            }
445                        }
446                        endNodes.add(nodeDef.getName());
447                    }
448                    catch (Exception ex) {
449                        log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
450                                 nodeDef.getName(), ex);
451                    }
452                }
453            }
454        }
455        return endNodes;
456    }
457
458    private String failNode(String nodeName) {
459        String failedNode = null;
460        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
461            String node = entry.getKey();
462            NodeInstance nodeInstance = entry.getValue();
463            if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) {
464                NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
465                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
466                try {
467                    nodeHandler.fail(new Context(nodeDef, node, null));
468                    failedNode = nodeDef.getName();
469                    nodeInstance.started = false;
470                }
471                catch (Exception ex) {
472                    log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex);
473                }
474                return failedNode;
475            }
476        }
477        return failedNode;
478    }
479
480    private List<String> killNodes() {
481        List<String> killedNodes = new ArrayList<String>();
482        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
483            String node = entry.getKey();
484            NodeInstance nodeInstance = entry.getValue();
485            if (nodeInstance.started) {
486                NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
487                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
488                try {
489                    nodeHandler.kill(new Context(nodeDef, node, null));
490                    killedNodes.add(nodeDef.getName());
491                }
492                catch (Exception ex) {
493                    log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex);
494                }
495            }
496        }
497        return killedNodes;
498    }
499
500    public LiteWorkflowApp getProcessDefinition() {
501        return def;
502    }
503
504    private static String createChildPath(String path, String child) {
505        return path + child + PATH_SEPARATOR;
506    }
507
508    private static String getParentPath(String path) {
509        path = path.substring(0, path.length() - 1);
510        return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1);
511    }
512
513    private static String createFullTransition(String executionPath, String transition) {
514        return executionPath + TRANSITION_SEPARATOR + transition;
515    }
516
517    private static String getExecutionPath(String fullTransition) {
518        int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
519        if (index == -1) {
520            throw new IllegalArgumentException("Invalid fullTransition");
521        }
522        return fullTransition.substring(0, index);
523    }
524
525    private static String getTransitionNode(String fullTransition) {
526        int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
527        if (index == -1) {
528            throw new IllegalArgumentException("Invalid fullTransition");
529        }
530        return fullTransition.substring(index + 1);
531    }
532
533    private NodeHandler newInstance(Class<? extends NodeHandler> handler) {
534        return (NodeHandler) ReflectionUtils.newInstance(handler, null);
535    }
536
537    private void refreshLog() {
538        XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME));
539        XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME));
540        XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName());
541        XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, ""));
542        XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId);
543        log = XLog.getLog(getClass());
544    }
545
546    public Status getStatus() {
547        return status;
548    }
549
550    public void setStatus(Status status) {
551        this.status = status;
552    }
553
554    @Override
555    public void write(DataOutput dOut) throws IOException {
556        dOut.writeUTF(instanceId);
557
558        //Hadoop Configuration has to get its act right
559        ByteArrayOutputStream baos = new ByteArrayOutputStream();
560        conf.writeXml(baos);
561        baos.close();
562        byte[] array = baos.toByteArray();
563        dOut.writeInt(array.length);
564        dOut.write(array);
565
566        def.write(dOut);
567        dOut.writeUTF(status.toString());
568        dOut.writeInt(executionPaths.size());
569        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
570            dOut.writeUTF(entry.getKey());
571            dOut.writeUTF(entry.getValue().nodeName);
572            dOut.writeBoolean(entry.getValue().started);
573        }
574        dOut.writeInt(persistentVars.size());
575        for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
576            dOut.writeUTF(entry.getKey());
577            dOut.writeUTF(entry.getValue());
578        }
579    }
580
581    @Override
582    public void readFields(DataInput dIn) throws IOException {
583        instanceId = dIn.readUTF();
584
585        //Hadoop Configuration has to get its act right
586        int len = dIn.readInt();
587        byte[] array = new byte[len];
588        dIn.readFully(array);
589        ByteArrayInputStream bais = new ByteArrayInputStream(array);
590        conf = new XConfiguration(bais);
591
592        def = new LiteWorkflowApp();
593        def.readFields(dIn);
594        status = Status.valueOf(dIn.readUTF());
595        int numExPaths = dIn.readInt();
596        for (int x = 0; x < numExPaths; x++) {
597            String path = dIn.readUTF();
598            String nodeName = dIn.readUTF();
599            boolean isStarted = dIn.readBoolean();
600            NodeInstance nodeInstance = new NodeInstance(nodeName);
601            nodeInstance.started = isStarted;
602            executionPaths.put(path, nodeInstance);
603        }
604        int numVars = dIn.readInt();
605        for (int x = 0; x < numVars; x++) {
606            String vName = dIn.readUTF();
607            String vVal = dIn.readUTF();
608            persistentVars.put(vName, vVal);
609        }
610        refreshLog();
611    }
612
613    @Override
614    public Configuration getConf() {
615        return conf;
616    }
617
618    @Override
619    public WorkflowApp getApp() {
620        return def;
621    }
622
623    @Override
624    public String getId() {
625        return instanceId;
626    }
627
628    @Override
629    public String getTransition(String node) {
630        return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
631    }
632
633    @Override
634    public boolean equals(Object o) {
635        return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
636    }
637
638    @Override
639    public int hashCode() {
640        return instanceId.hashCode();
641    }
642
643}