001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.workflow.lite;
019
020import org.apache.oozie.workflow.WorkflowException;
021import org.apache.oozie.util.IOUtils;
022import org.apache.oozie.util.XmlUtils;
023import org.apache.oozie.util.ParamChecker;
024import org.apache.oozie.util.ParameterVerifier;
025import org.apache.oozie.util.ParameterVerifierException;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.action.ActionExecutor;
028import org.apache.oozie.service.Services;
029import org.apache.oozie.service.ActionService;
030import org.apache.hadoop.conf.Configuration;
031import org.jdom.Element;
032import org.jdom.JDOMException;
033import org.jdom.Namespace;
034import org.xml.sax.SAXException;
035
036import javax.xml.transform.stream.StreamSource;
037import javax.xml.validation.Schema;
038import javax.xml.validation.Validator;
039import java.io.IOException;
040import java.io.Reader;
041import java.io.StringReader;
042import java.io.StringWriter;
043import java.util.ArrayList;
044import java.util.Arrays;
045import java.util.Deque;
046import java.util.HashMap;
047import java.util.HashSet;
048import java.util.LinkedList;
049import java.util.List;
050import java.util.Map;
051
052/**
053 * Class to parse and validate workflow xml
054 */
055public class LiteWorkflowAppParser {
056
057    private static final String DECISION_E = "decision";
058    private static final String ACTION_E = "action";
059    private static final String END_E = "end";
060    private static final String START_E = "start";
061    private static final String JOIN_E = "join";
062    private static final String FORK_E = "fork";
063    private static final Object KILL_E = "kill";
064
065    private static final String SLA_INFO = "info";
066    private static final String CREDENTIALS = "credentials";
067    private static final String GLOBAL = "global";
068    private static final String PARAMETERS = "parameters";
069
070    private static final String NAME_A = "name";
071    private static final String CRED_A = "cred";
072    private static final String USER_RETRY_MAX_A = "retry-max";
073    private static final String USER_RETRY_INTERVAL_A = "retry-interval";
074    private static final String TO_A = "to";
075
076    private static final String FORK_PATH_E = "path";
077    private static final String FORK_START_A = "start";
078
079    private static final String ACTION_OK_E = "ok";
080    private static final String ACTION_ERROR_E = "error";
081
082    private static final String DECISION_SWITCH_E = "switch";
083    private static final String DECISION_CASE_E = "case";
084    private static final String DECISION_DEFAULT_E = "default";
085
086    private static final String KILL_MESSAGE_E = "message";
087    public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
088    public static final String WF_VALIDATE_FORK_JOIN = "oozie.wf.validate.ForkJoin";
089
090    private Schema schema;
091    private Class<? extends ControlNodeHandler> controlNodeHandler;
092    private Class<? extends DecisionNodeHandler> decisionHandlerClass;
093    private Class<? extends ActionNodeHandler> actionHandlerClass;
094
095    private static enum VisitStatus {
096        VISITING, VISITED
097    }
098
099    private List<String> forkList = new ArrayList<String>();
100    private List<String> joinList = new ArrayList<String>();
101    private StartNodeDef startNode;
102    private List<String> visitedOkNodes = new ArrayList<String>();
103    private List<String> visitedJoinNodes = new ArrayList<String>();
104
105    public LiteWorkflowAppParser(Schema schema,
106                                 Class<? extends ControlNodeHandler> controlNodeHandler,
107                                 Class<? extends DecisionNodeHandler> decisionHandlerClass,
108                                 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
109        this.schema = schema;
110        this.controlNodeHandler = controlNodeHandler;
111        this.decisionHandlerClass = decisionHandlerClass;
112        this.actionHandlerClass = actionHandlerClass;
113    }
114
115    /**
116     * Parse and validate xml to {@link LiteWorkflowApp}
117     *
118     * @param reader
119     * @return LiteWorkflowApp
120     * @throws WorkflowException
121     */
122    public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException {
123        try {
124            StringWriter writer = new StringWriter();
125            IOUtils.copyCharStream(reader, writer);
126            String strDef = writer.toString();
127
128            if (schema != null) {
129                Validator validator = schema.newValidator();
130                validator.validate(new StreamSource(new StringReader(strDef)));
131            }
132
133            Element wfDefElement = XmlUtils.parseXml(strDef);
134            ParameterVerifier.verifyParameters(jobConf, wfDefElement);
135            LiteWorkflowApp app = parse(strDef, wfDefElement);
136            Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
137            traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
138            validate(app, app.getNode(StartNodeDef.START), traversed);
139            //Validate whether fork/join are in pair or not
140            if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true) && Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
141                validateForkJoin(app);
142            }
143            return app;
144        }
145        catch (ParameterVerifierException ex) {
146            throw new WorkflowException(ex);
147        }
148        catch (JDOMException ex) {
149            throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
150        }
151        catch (SAXException ex) {
152            throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
153        }
154        catch (IOException ex) {
155            throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
156        }
157    }
158
159    /**
160     * Validate whether fork/join are in pair or not
161     * @param app LiteWorkflowApp
162     * @throws WorkflowException
163     */
164    private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
165        // Make sure the number of forks and joins in wf are equal
166        if (forkList.size() != joinList.size()) {
167            throw new WorkflowException(ErrorCode.E0730);
168        }
169
170        // No need to bother going through all of this if there are no fork/join nodes
171        if (!forkList.isEmpty()) {
172            visitedOkNodes.clear();
173            visitedJoinNodes.clear();
174            validateForkJoin(startNode, app, new LinkedList<String>(), new LinkedList<String>(), new LinkedList<String>(), true);
175        }
176    }
177
178    /*
179     * Recursively walk through the DAG and make sure that all fork paths are valid.
180     * This should be called from validateForkJoin(LiteWorkflowApp app).  It assumes that visitedOkNodes and visitedJoinNodes are
181     * both empty ArrayLists on the first call.
182     *
183     * @param node the current node; use the startNode on the first call
184     * @param app the WorkflowApp
185     * @param forkNodes a stack of the current fork nodes
186     * @param joinNodes a stack of the current join nodes
187     * @param path a stack of the current path
188     * @param okTo false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has
189     * already been visited at least once before
190     * @throws WorkflowException
191     */
192    private void validateForkJoin(NodeDef node, LiteWorkflowApp app, Deque<String> forkNodes, Deque<String> joinNodes,
193            Deque<String> path, boolean okTo) throws WorkflowException {
194        if (path.contains(node.getName())) {
195            // cycle
196            throw new WorkflowException(ErrorCode.E0741, node.getName(), Arrays.toString(path.toArray()));
197        }
198        path.push(node.getName());
199
200        // Make sure that we're not revisiting a node (that's not a Kill, Join, or End type) that's been visited before from an
201        // "ok to" transition; if its from an "error to" transition, then its okay to visit it multiple times.  Also, because we
202        // traverse through join nodes multiple times, we have to make sure not to throw an exception here when we're really just
203        // re-walking the same execution path (this is why we need the visitedJoinNodes list used later)
204        if (okTo && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) {
205            if (visitedOkNodes.contains(node.getName())) {
206                throw new WorkflowException(ErrorCode.E0743, node.getName());
207            }
208            visitedOkNodes.add(node.getName());
209        }
210
211        if (node instanceof StartNodeDef) {
212            String transition = node.getTransitions().get(0);   // start always has only 1 transition
213            NodeDef tranNode = app.getNode(transition);
214            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
215        }
216        else if (node instanceof ActionNodeDef) {
217            String transition = node.getTransitions().get(0);   // "ok to" transition
218            NodeDef tranNode = app.getNode(transition);
219            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);  // propogate okTo
220            transition = node.getTransitions().get(1);          // "error to" transition
221            tranNode = app.getNode(transition);
222            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false); // use false
223        }
224        else if (node instanceof DecisionNodeDef) {
225            for(String transition : (new HashSet<String>(node.getTransitions()))) {
226                NodeDef tranNode = app.getNode(transition);
227                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
228            }
229        }
230        else if (node instanceof ForkNodeDef) {
231            forkNodes.push(node.getName());
232            for(String transition : (new HashSet<String>(node.getTransitions()))) {
233                NodeDef tranNode = app.getNode(transition);
234                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
235            }
236            forkNodes.pop();
237            if (!joinNodes.isEmpty()) {
238                joinNodes.pop();
239            }
240        }
241        else if (node instanceof JoinNodeDef) {
242            if (forkNodes.isEmpty()) {
243                // no fork for join to match with
244                throw new WorkflowException(ErrorCode.E0742, node.getName());
245            }
246            if (forkNodes.size() > joinNodes.size() && (joinNodes.isEmpty() || !joinNodes.peek().equals(node.getName()))) {
247                joinNodes.push(node.getName());
248            }
249            if (!joinNodes.peek().equals(node.getName())) {
250                // join doesn't match fork
251                throw new WorkflowException(ErrorCode.E0732, forkNodes.peek(), node.getName(), joinNodes.peek());
252            }
253            joinNodes.pop();
254            String currentForkNode = forkNodes.pop();
255            String transition = node.getTransitions().get(0);   // join always has only 1 transition
256            NodeDef tranNode = app.getNode(transition);
257            // If we're already under a situation where okTo is false, use false (propogate it)
258            // Or if we've already visited this join node, use false (because we've already traversed this path before and we don't
259            // want to throw an exception from the check against visitedOkNodes)
260            if (!okTo || visitedJoinNodes.contains(node.getName())) {
261                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false);
262            // Else, use true because this is either the first time we've gone through this join node or okTo was already false
263            } else {
264                visitedJoinNodes.add(node.getName());
265                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, true);
266            }
267            forkNodes.push(currentForkNode);
268            joinNodes.push(node.getName());
269        }
270        else if (node instanceof KillNodeDef) {
271            // do nothing
272        }
273        else if (node instanceof EndNodeDef) {
274            if (!forkNodes.isEmpty()) {
275                path.pop();     // = node
276                String parent = path.peek();
277                // can't go to an end node in a fork
278                throw new WorkflowException(ErrorCode.E0737, parent, node.getName());
279            }
280        }
281        else {
282            // invalid node type (shouldn't happen)
283            throw new WorkflowException(ErrorCode.E0740, node.getName());
284        }
285        path.pop();
286    }
287
288    /**
289     * Parse xml to {@link LiteWorkflowApp}
290     *
291     * @param strDef
292     * @param root
293     * @return LiteWorkflowApp
294     * @throws WorkflowException
295     */
296    @SuppressWarnings({"unchecked", "ConstantConditions"})
297    private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException {
298        Namespace ns = root.getNamespace();
299        LiteWorkflowApp def = null;
300        Element global = null;
301        for (Element eNode : (List<Element>) root.getChildren()) {
302            if (eNode.getName().equals(START_E)) {
303                def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
304                                          new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A)));
305            }
306            else {
307                if (eNode.getName().equals(END_E)) {
308                    def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler));
309                }
310                else {
311                    if (eNode.getName().equals(KILL_E)) {
312                        def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A),
313                                                    eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler));
314                    }
315                    else {
316                        if (eNode.getName().equals(FORK_E)) {
317                            List<String> paths = new ArrayList<String>();
318                            for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
319                                paths.add(tran.getAttributeValue(FORK_START_A));
320                            }
321                            def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths));
322                        }
323                        else {
324                            if (eNode.getName().equals(JOIN_E)) {
325                                def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler,
326                                                            eNode.getAttributeValue(TO_A)));
327                            }
328                            else {
329                                if (eNode.getName().equals(DECISION_E)) {
330                                    Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
331                                    List<String> transitions = new ArrayList<String>();
332                                    for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
333                                        transitions.add(e.getAttributeValue(TO_A));
334                                    }
335                                    transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
336
337                                    String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
338                                    def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
339                                                                    transitions));
340                                }
341                                else {
342                                    if (ACTION_E.equals(eNode.getName())) {
343                                        String[] transitions = new String[2];
344                                        Element eActionConf = null;
345                                        for (Element elem : (List<Element>) eNode.getChildren()) {
346                                            if (ACTION_OK_E.equals(elem.getName())) {
347                                                transitions[0] = elem.getAttributeValue(TO_A);
348                                            }
349                                            else {
350                                                if (ACTION_ERROR_E.equals(elem.getName())) {
351                                                    transitions[1] = elem.getAttributeValue(TO_A);
352                                                }
353                                                else {
354                                                    if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
355                                                        continue;
356                                                    }
357                                                    else {
358                                                        eActionConf = elem;
359                                                        handleGlobal(ns, global, elem);
360                                                        }
361                                                }
362                                            }
363                                        }
364
365                                        String credStr = eNode.getAttributeValue(CRED_A);
366                                        String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
367                                        String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
368
369                                        String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
370                                        def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
371                                                                      transitions[0], transitions[1], credStr,
372                                                                      userRetryMaxStr, userRetryIntervalStr));
373                                    }
374                                    else {
375                                        if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
376                                            // No operation is required
377                                        }
378                                        else {
379                                            if (eNode.getName().equals(GLOBAL)) {
380                                                global = eNode;
381                                            }
382                                            else {
383                                                if (eNode.getName().equals(PARAMETERS)) {
384                                                    // No operation is required
385                                                }
386                                                else {
387                                                    throw new WorkflowException(ErrorCode.E0703, eNode.getName());
388                                                }
389                                            }
390                                        }
391                                    }
392                                }
393                            }
394                        }
395                    }
396                }
397            }
398        }
399        return def;
400    }
401
402    /**
403     * Validate workflow xml
404     *
405     * @param app
406     * @param node
407     * @param traversed
408     * @throws WorkflowException
409     */
410    private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
411        if (node instanceof StartNodeDef) {
412            startNode = (StartNodeDef) node;
413        }
414        else {
415            try {
416                ParamChecker.validateActionName(node.getName());
417            }
418            catch (IllegalArgumentException ex) {
419                throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
420            }
421        }
422        if (node instanceof ActionNodeDef) {
423            try {
424                Element action = XmlUtils.parseXml(node.getConf());
425                boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
426                if (!supportedAction) {
427                    throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
428                }
429            }
430            catch (JDOMException ex) {
431                throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
432            }
433        }
434
435        if(node instanceof ForkNodeDef){
436            forkList.add(node.getName());
437        }
438
439        if(node instanceof JoinNodeDef){
440            joinList.add(node.getName());
441        }
442
443        if (node instanceof EndNodeDef) {
444            traversed.put(node.getName(), VisitStatus.VISITED);
445            return;
446        }
447        if (node instanceof KillNodeDef) {
448            traversed.put(node.getName(), VisitStatus.VISITED);
449            return;
450        }
451        for (String transition : node.getTransitions()) {
452
453            if (app.getNode(transition) == null) {
454                throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
455            }
456
457            //check if it is a cycle
458            if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
459                throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
460            }
461            //ignore validated one
462            if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
463                continue;
464            }
465
466            traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
467            validate(app, app.getNode(transition), traversed);
468        }
469        traversed.put(node.getName(), VisitStatus.VISITED);
470    }
471
472    /**
473     * Handle the global section
474     *
475     * @param ns
476     * @param global
477     * @param eActionConf
478     * @throws WorkflowException
479     */
480
481    private void handleGlobal(Namespace ns, Element global, Element eActionConf) throws WorkflowException {
482
483        // Use the action's namespace when getting children of the action (will be different than ns for extension actions)
484        Namespace actionNs = eActionConf.getNamespace();
485
486        if (global != null) {
487            Element globalJobTracker = global.getChild("job-tracker", ns);
488            Element globalNameNode = global.getChild("name-node", ns);
489            List<Element> globalJobXml = global.getChildren("job-xml", ns);
490            Element globalConfiguration = global.getChild("configuration", ns);
491
492            if (globalJobTracker != null && eActionConf.getChild("job-tracker", actionNs) == null) {
493                Element jobTracker = new Element("job-tracker", actionNs);
494                jobTracker.setText(globalJobTracker.getText());
495                eActionConf.addContent(jobTracker);
496            }
497
498            if (globalNameNode != null && eActionConf.getChild("name-node", actionNs) == null) {
499                Element nameNode = new Element("name-node", actionNs);
500                nameNode.setText(globalNameNode.getText());
501                eActionConf.addContent(nameNode);
502            }
503
504            if (!globalJobXml.isEmpty()) {
505                List<Element> actionJobXml = eActionConf.getChildren("job-xml", actionNs);
506                for(Element jobXml: globalJobXml){
507                    boolean alreadyExists = false;
508                    for(Element actionXml: actionJobXml){
509                        if(jobXml.getText().equals(actionXml.getText())){
510                            alreadyExists = true;
511                        }
512                    }
513
514                    if (!alreadyExists){
515                        Element ejobXml = new Element("job-xml", actionNs);
516                        ejobXml.setText(jobXml.getText());
517                        eActionConf.addContent(ejobXml);
518                    }
519
520                }
521            }
522
523            if (globalConfiguration != null) {
524                Element actionConfiguration = eActionConf.getChild("configuration", actionNs);
525                if (actionConfiguration == null) {
526                    actionConfiguration = new Element("configuration", actionNs);
527                    eActionConf.addContent(actionConfiguration);
528                }
529                for (Element globalConfig : (List<Element>) globalConfiguration.getChildren()) {
530                    boolean isSet = false;
531                    String globalVarName = globalConfig.getChildText("name", ns);
532                    for (Element local : (List<Element>) actionConfiguration.getChildren()) {
533                        if (local.getChildText("name", actionNs).equals(globalVarName)) {
534                            isSet = true;
535                        }
536                    }
537                    if (!isSet) {
538                        Element varToCopy = new Element("property", actionNs);
539                        Element varName = new Element("name", actionNs);
540                        Element varValue = new Element("value", actionNs);
541
542                        varName.setText(globalConfig.getChildText("name", ns));
543                        varValue.setText(globalConfig.getChildText("value", ns));
544
545                        varToCopy.addContent(varName);
546                        varToCopy.addContent(varValue);
547
548                        actionConfiguration.addContent(varToCopy);
549                    }
550                }
551            }
552        }
553        else {
554            ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(eActionConf.getName());
555            if (ae == null) {
556                throw new WorkflowException(ErrorCode.E0723, eActionConf.getName(), ActionService.class.getName());
557            }
558            if (ae.requiresNNJT) {
559
560                if (eActionConf.getChild("name-node", actionNs) == null) {
561                    throw new WorkflowException(ErrorCode.E0701, "No name-node defined");
562                }
563                if (eActionConf.getChild("job-tracker", actionNs) == null) {
564                    throw new WorkflowException(ErrorCode.E0701, "No job-tracker defined");
565                }
566            }
567        }
568    }
569
570}