001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.io.IOException;
021import java.io.InputStreamReader;
022import java.io.Reader;
023import java.io.StringReader;
024import java.io.StringWriter;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.ArrayList;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.TreeSet;
036
037import javax.xml.transform.stream.StreamSource;
038import javax.xml.validation.Validator;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.oozie.CoordinatorJobBean;
044import org.apache.oozie.ErrorCode;
045import org.apache.oozie.client.CoordinatorJob;
046import org.apache.oozie.client.Job;
047import org.apache.oozie.client.OozieClient;
048import org.apache.oozie.client.CoordinatorJob.Execution;
049import org.apache.oozie.command.CommandException;
050import org.apache.oozie.command.SubmitTransitionXCommand;
051import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
052import org.apache.oozie.coord.CoordELEvaluator;
053import org.apache.oozie.coord.CoordELFunctions;
054import org.apache.oozie.coord.CoordinatorJobException;
055import org.apache.oozie.coord.TimeUnit;
056import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
057import org.apache.oozie.executor.jpa.JPAExecutorException;
058import org.apache.oozie.service.DagXLogInfoService;
059import org.apache.oozie.service.HadoopAccessorException;
060import org.apache.oozie.service.HadoopAccessorService;
061import org.apache.oozie.service.JPAService;
062import org.apache.oozie.service.SchemaService;
063import org.apache.oozie.service.Service;
064import org.apache.oozie.service.Services;
065import org.apache.oozie.service.UUIDService;
066import org.apache.oozie.service.SchemaService.SchemaName;
067import org.apache.oozie.service.UUIDService.ApplicationType;
068import org.apache.oozie.util.ConfigUtils;
069import org.apache.oozie.util.DateUtils;
070import org.apache.oozie.util.ELEvaluator;
071import org.apache.oozie.util.ELUtils;
072import org.apache.oozie.util.IOUtils;
073import org.apache.oozie.util.InstrumentUtils;
074import org.apache.oozie.util.LogUtils;
075import org.apache.oozie.util.ParamChecker;
076import org.apache.oozie.util.ParameterVerifier;
077import org.apache.oozie.util.ParameterVerifierException;
078import org.apache.oozie.util.PropertiesUtils;
079import org.apache.oozie.util.XConfiguration;
080import org.apache.oozie.util.XLog;
081import org.apache.oozie.util.XmlUtils;
082import org.jdom.Attribute;
083import org.jdom.Element;
084import org.jdom.JDOMException;
085import org.jdom.Namespace;
086import org.xml.sax.SAXException;
087
088/**
089 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
090 * table.
091 * <p/>
092 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
093 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
094 * at runtime.
095 */
096public class CoordSubmitXCommand extends SubmitTransitionXCommand {
097
098    private Configuration conf;
099    private final String bundleId;
100    private final String coordName;
101    private boolean dryrun;
102    private JPAService jpaService = null;
103    private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
104
105    public static final String CONFIG_DEFAULT = "coord-config-default.xml";
106    public static final String COORDINATOR_XML_FILE = "coordinator.xml";
107    public final String COORD_INPUT_EVENTS ="input-events";
108    public final String COORD_OUTPUT_EVENTS = "output-events";
109    public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
110    public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
111
112    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
113    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
114
115    private CoordinatorJobBean coordJob = null;
116    /**
117     * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
118     */
119    public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
120
121    public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
122
123    public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
124
125    public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
126            + "coord.materialization.throttling.factor";
127
128    /**
129     * Default MAX timeout in minutes, after which coordinator input check will timeout
130     */
131    public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
132
133
134    public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
135
136    private ELEvaluator evalFreq = null;
137    private ELEvaluator evalNofuncs = null;
138    private ELEvaluator evalData = null;
139    private ELEvaluator evalInst = null;
140    private ELEvaluator evalAction = null;
141    private ELEvaluator evalSla = null;
142
143    static {
144        String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
145                PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
146                PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
147                PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
148                PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
149        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
150
151        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
152        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
153        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
154    }
155
156    /**
157     * Constructor to create the Coordinator Submit Command.
158     *
159     * @param conf : Configuration for Coordinator job
160     */
161    public CoordSubmitXCommand(Configuration conf) {
162        super("coord_submit", "coord_submit", 1);
163        this.conf = ParamChecker.notNull(conf, "conf");
164        this.bundleId = null;
165        this.coordName = null;
166    }
167
168    /**
169     * Constructor to create the Coordinator Submit Command by bundle job.
170     *
171     * @param conf : Configuration for Coordinator job
172     * @param bundleId : bundle id
173     * @param coordName : coord name
174     */
175    public CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) {
176        super("coord_submit", "coord_submit", 1);
177        this.conf = ParamChecker.notNull(conf, "conf");
178        this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
179        this.coordName = ParamChecker.notEmpty(coordName, "coordName");
180    }
181
182    /**
183     * Constructor to create the Coordinator Submit Command.
184     *
185     * @param dryrun : if dryrun
186     * @param conf : Configuration for Coordinator job
187     */
188    public CoordSubmitXCommand(boolean dryrun, Configuration conf) {
189        this(conf);
190        this.dryrun = dryrun;
191    }
192
193    /* (non-Javadoc)
194     * @see org.apache.oozie.command.XCommand#execute()
195     */
196    @Override
197    protected String submit() throws CommandException {
198        String jobId = null;
199        LOG.info("STARTED Coordinator Submit");
200        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
201
202        boolean exceptionOccured = false;
203        try {
204            mergeDefaultConfig();
205
206            String appXml = readAndValidateXml();
207            coordJob.setOrigJobXml(appXml);
208            LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
209
210            Element eXml = XmlUtils.parseXml(appXml);
211
212            String appNamespace = readAppNamespace(eXml);
213            coordJob.setAppNamespace(appNamespace);
214
215            ParameterVerifier.verifyParameters(conf, eXml);
216
217            appXml = XmlUtils.removeComments(appXml);
218            initEvaluators();
219            Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
220
221            validateCoordinatorJob();
222
223            // checking if the coordinator application data input/output events
224            // specify multiple data instance values in erroneous manner
225            checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
226            checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
227
228            LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
229
230            jobId = storeToDB(eJob, coordJob);
231            // log job info for coordinator job
232            LogUtils.setLogInfo(coordJob, logInfo);
233            LOG = XLog.resetPrefix(LOG);
234
235            if (!dryrun) {
236                // submit a command to materialize jobs for the next 1 hour (3600 secs)
237                // so we don't wait 10 mins for the Service to run.
238                queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
239            }
240            else {
241                Date startTime = coordJob.getStartTime();
242                long startTimeMilli = startTime.getTime();
243                long endTimeMilli = startTimeMilli + (3600 * 1000);
244                Date jobEndTime = coordJob.getEndTime();
245                Date endTime = new Date(endTimeMilli);
246                if (endTime.compareTo(jobEndTime) > 0) {
247                    endTime = jobEndTime;
248                }
249                jobId = coordJob.getId();
250                LOG.info("[" + jobId + "]: Update status to RUNNING");
251                coordJob.setStatus(Job.Status.RUNNING);
252                coordJob.setPending();
253                CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
254                        endTime);
255                Configuration jobConf = null;
256                try {
257                    jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
258                }
259                catch (IOException e1) {
260                    LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
261                }
262                String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
263                String output = coordJob.getJobXml() + System.getProperty("line.separator")
264                + "***actions for instance***" + action;
265                return output;
266            }
267        }
268        catch (JDOMException jex) {
269            exceptionOccured = true;
270            LOG.warn("ERROR: ", jex);
271            throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
272        }
273        catch (CoordinatorJobException cex) {
274            exceptionOccured = true;
275            LOG.warn("ERROR:  ", cex);
276            throw new CommandException(cex);
277        }
278        catch (ParameterVerifierException pex) {
279            exceptionOccured = true;
280            LOG.warn("ERROR: ", pex);
281            throw new CommandException(pex);
282        }
283        catch (IllegalArgumentException iex) {
284            exceptionOccured = true;
285            LOG.warn("ERROR:  ", iex);
286            throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex);
287        }
288        catch (Exception ex) {
289            exceptionOccured = true;
290            LOG.warn("ERROR:  ", ex);
291            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
292        }
293        finally {
294            if (exceptionOccured) {
295                if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){
296                    coordJob.setStatus(CoordinatorJob.Status.FAILED);
297                    coordJob.resetPending();
298                }
299            }
300        }
301
302        LOG.info("ENDED Coordinator Submit jobId=" + jobId);
303        return jobId;
304    }
305
306    /**
307     * Method that validates values in the definition for correctness. Placeholder to add more.
308     */
309    private void validateCoordinatorJob() {
310        // check if startTime < endTime
311        if (coordJob.getStartTime().after(coordJob.getEndTime())) {
312            throw new IllegalArgumentException("Coordinator Start Time cannot be greater than End Time.");
313        }
314    }
315
316  /*
317  * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag
318  * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
319  */
320    private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
321        Element eventsSpec, dataSpec, instance;
322        List<Element> instanceSpecList;
323        Namespace ns = eCoordJob.getNamespace();
324        String instanceValue;
325        eventsSpec = eCoordJob.getChild(eventType, ns);
326        if (eventsSpec != null) {
327            dataSpec = eventsSpec.getChild(dataType, ns);
328            if (dataSpec != null) {
329                // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
330                instanceSpecList = dataSpec.getChildren("instance", ns);
331                Iterator instanceIter = instanceSpecList.iterator();
332                while(instanceIter.hasNext()) {
333                    instance = ((Element) instanceIter.next());
334                    if(instance.getContentSize() == 0) { //empty string or whitespace
335                        throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
336                    }
337                    instanceValue = instance.getContent(0).toString();
338                    boolean isInvalid = false;
339                    try {
340                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
341                    } catch (Exception e) {
342                        handleELParseException(eventType, dataType, instanceValue);
343                    }
344                    if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
345                        handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
346                    }
347                }
348
349                // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors
350                instanceSpecList = dataSpec.getChildren("start-instance", ns);
351                instanceIter = instanceSpecList.iterator();
352                while(instanceIter.hasNext()) {
353                    instance = ((Element) instanceIter.next());
354                    if(instance.getContentSize() == 0) { //empty string or whitespace
355                        throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!");
356                    }
357                    instanceValue = instance.getContent(0).toString();
358                    boolean isInvalid = false;
359                    try {
360                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
361                    } catch (Exception e) {
362                        handleELParseException(eventType, dataType, instanceValue);
363                    }
364                    if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0
365                        handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue);
366                    }
367                }
368
369                // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors
370                instanceSpecList = dataSpec.getChildren("end-instance", ns);
371                instanceIter = instanceSpecList.iterator();
372                while(instanceIter.hasNext()) {
373                    instance = ((Element) instanceIter.next());
374                    if(instance.getContentSize() == 0) { //empty string or whitespace
375                        throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!");
376                    }
377                    instanceValue = instance.getContent(0).toString();
378                    boolean isInvalid = false;
379                    try {
380                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
381                    } catch (Exception e) {
382                        handleELParseException(eventType, dataType, instanceValue);
383                    }
384                    if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
385                        handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue);
386                    }
387                }
388
389            }
390        }
391    }
392
393    private void handleELParseException(String eventType, String dataType, String instanceValue)
394            throws CoordinatorJobException {
395        String correctAction = null;
396        if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
397            correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
398        } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
399            correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
400        }
401        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
402                + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
403    }
404
405    private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
406            throws CoordinatorJobException {
407        String correctAction = null;
408        if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
409            correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
410        } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
411            correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
412        }
413        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
414                + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
415    }
416
417    private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue)
418            throws CoordinatorJobException {
419        String correctAction = "Coordinator app definition should not have multiple start-instances";
420        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue
421                + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction);
422    }
423
424    private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue)
425            throws CoordinatorJobException {
426        String correctAction = "Coordinator app definition should not have multiple end-instances";
427        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue
428                + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction);
429    }
430
431    /**
432     * Read the application XML and validate against coordinator Schema
433     *
434     * @return validated coordinator XML
435     * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
436     */
437    private String readAndValidateXml() throws CoordinatorJobException {
438        String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
439                OozieClient.COORDINATOR_APP_PATH);
440        String coordXml = readDefinition(appPath);
441        validateXml(coordXml);
442        return coordXml;
443    }
444
445    /**
446     * Validate against Coordinator XSD file
447     *
448     * @param xmlContent : Input coordinator xml
449     * @throws CoordinatorJobException thrown if unable to validate coordinator xml
450     */
451    private void validateXml(String xmlContent) throws CoordinatorJobException {
452        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
453        Validator validator = schema.newValidator();
454        try {
455            validator.validate(new StreamSource(new StringReader(xmlContent)));
456        }
457        catch (SAXException ex) {
458            LOG.warn("SAXException :", ex);
459            throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
460        }
461        catch (IOException ex) {
462            LOG.warn("IOException :", ex);
463            throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
464        }
465    }
466
467    /**
468     * Read the application XML schema namespace
469     *
470     * @param coordXmlElement input coordinator xml Element
471     * @return app xml namespace
472     * @throws CoordinatorJobException
473     */
474    private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException {
475        Namespace ns = coordXmlElement.getNamespace();
476        if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
477            throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
478                    + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
479        }
480        if (ns != null) {
481            return ns.getURI();
482        }
483        else {
484            throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
485        }
486    }
487
488    /**
489     * Merge default configuration with user-defined configuration.
490     *
491     * @throws CommandException thrown if failed to read or merge configurations
492     */
493    protected void mergeDefaultConfig() throws CommandException {
494        Path configDefault = null;
495        try {
496            String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
497            Path coordAppPath = new Path(coordAppPathStr);
498            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
499            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
500            Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
501            FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
502
503            // app path could be a directory
504            if (!fs.isFile(coordAppPath)) {
505                configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
506            } else {
507                configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
508            }
509
510            if (fs.exists(configDefault)) {
511                Configuration defaultConf = new XConfiguration(fs.open(configDefault));
512                PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
513                XConfiguration.injectDefaults(defaultConf, conf);
514            }
515            else {
516                LOG.info("configDefault Doesn't exist " + configDefault);
517            }
518            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
519
520            // Resolving all variables in the job properties.
521            // This ensures the Hadoop Configuration semantics is preserved.
522            XConfiguration resolvedVarsConf = new XConfiguration();
523            for (Map.Entry<String, String> entry : conf) {
524                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
525            }
526            conf = resolvedVarsConf;
527        }
528        catch (IOException e) {
529            throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
530                    + configDefault, e);
531        }
532        catch (HadoopAccessorException e) {
533            throw new CommandException(e);
534        }
535        LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
536    }
537
538    /**
539     * The method resolve all the variables that are defined in configuration. It also include the data set definition
540     * from dataset file into XML.
541     *
542     * @param appXml : Original job XML
543     * @param conf : Configuration of the job
544     * @param coordJob : Coordinator job bean to be populated.
545     * @return Resolved and modified job XML element.
546     * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
547     * @throws Exception thrown if failed to resolve basic entities or include referred datasets
548     */
549    public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
550    throws CoordinatorJobException, Exception {
551        Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
552        includeDataSets(basicResolvedApp, conf);
553        return basicResolvedApp;
554    }
555
556    /**
557     * Insert data set into data-in and data-out tags.
558     *
559     * @param eAppXml : coordinator application XML
560     * @param eDatasets : DataSet XML
561     */
562    @SuppressWarnings("unchecked")
563    private void insertDataSet(Element eAppXml, Element eDatasets) {
564        // Adding DS definition in the coordinator XML
565        Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
566        if (inputList != null) {
567            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
568                Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
569                dataIn.getContent().add(0, eDataset);
570            }
571        }
572        Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
573        if (outputList != null) {
574            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
575                Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
576                dataOut.getContent().add(0, eDataset);
577            }
578        }
579    }
580
581    /**
582     * Find a specific dataset from a list of Datasets.
583     *
584     * @param eDatasets : List of data sets
585     * @param name : queried data set name
586     * @return one Dataset element. otherwise throw Exception
587     */
588    @SuppressWarnings("unchecked")
589    private static Element findDataSet(Element eDatasets, String name) {
590        for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
591            if (eDataset.getAttributeValue("name").equals(name)) {
592                eDataset = (Element) eDataset.clone();
593                eDataset.detach();
594                return eDataset;
595            }
596        }
597        throw new RuntimeException("undefined dataset: " + name);
598    }
599
600    /**
601     * Initialize all the required EL Evaluators.
602     */
603    protected void initEvaluators() {
604        evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
605        evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
606        evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
607        evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
608        evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
609    }
610
611    /**
612     * Resolve basic entities using job Configuration.
613     *
614     * @param conf :Job configuration
615     * @param appXml : Original job XML
616     * @param coordJob : Coordinator job bean to be populated.
617     * @return Resolved job XML element.
618     * @throws CoordinatorJobException thrown if failed to resolve basic entities
619     * @throws Exception thrown if failed to resolve basic entities
620     */
621    @SuppressWarnings("unchecked")
622    protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
623    throws CoordinatorJobException, Exception {
624        Element eAppXml = XmlUtils.parseXml(appXml);
625        // job's main attributes
626        // frequency
627        String val = resolveAttribute("frequency", eAppXml, evalFreq);
628        int ival = ParamChecker.checkInteger(val, "frequency");
629        ParamChecker.checkGTZero(ival, "frequency");
630        coordJob.setFrequency(Integer.toString(ival));
631        TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
632                .getVariable("timeunit"));
633        addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
634        // TimeUnit
635        coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
636        // End Of Duration
637        tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
638                .getVariable("endOfDuration"));
639        addAnAttribute("end_of_duration", eAppXml, tmp.toString());
640        // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
641
642        // Application name
643        if (this.coordName == null) {
644            String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf);
645            coordJob.setAppName(name);
646        }
647        else {
648            // this coord job is created from bundle
649            coordJob.setAppName(this.coordName);
650        }
651
652        // start time
653        val = resolveAttribute("start", eAppXml, evalNofuncs);
654        ParamChecker.checkDateOozieTZ(val, "start");
655        coordJob.setStartTime(DateUtils.parseDateOozieTZ(val));
656        // end time
657        val = resolveAttribute("end", eAppXml, evalNofuncs);
658        ParamChecker.checkDateOozieTZ(val, "end");
659        coordJob.setEndTime(DateUtils.parseDateOozieTZ(val));
660        // Time zone
661        val = resolveAttribute("timezone", eAppXml, evalNofuncs);
662        ParamChecker.checkTimeZone(val, "timezone");
663        coordJob.setTimeZone(val);
664
665        // controls
666        val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalFreq);
667        if (val == "") {
668            val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
669        }
670
671        ival = ParamChecker.checkInteger(val, "timeout");
672        if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
673            ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
674        }
675        coordJob.setTimeout(ival);
676
677        val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
678        if (val == null || val.isEmpty()) {
679            val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1");
680        }
681        ival = ParamChecker.checkInteger(val, "concurrency");
682        coordJob.setConcurrency(ival);
683
684        val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
685        if (val == null || val.isEmpty()) {
686            int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
687            ival = defaultThrottle;
688        }
689        else {
690            ival = ParamChecker.checkInteger(val, "throttle");
691        }
692        int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
693        float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
694        int maxThrottle = (int) (maxQueue * factor);
695        if (ival > maxThrottle || ival < 1) {
696            ival = maxThrottle;
697        }
698        LOG.debug("max throttle " + ival);
699        coordJob.setMatThrottling(ival);
700
701        val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
702        if (val == "") {
703            val = Execution.FIFO.toString();
704        }
705        coordJob.setExecution(Execution.valueOf(val));
706        String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
707        ParamChecker.isMember(val, acceptedVals, "execution");
708
709        // datasets
710        resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
711        // for each data set
712        resolveDataSets(eAppXml);
713        HashMap<String, String> dataNameList = new HashMap<String, String>();
714        resolveIOEvents(eAppXml, dataNameList);
715
716        resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
717                eAppXml.getNamespace()), evalNofuncs);
718        // TODO: If action or workflow tag is missing, NullPointerException will
719        // occur
720        Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
721                eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
722        evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
723        if (configElem != null) {
724            for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
725                resolveTagContents("name", propElem, evalData);
726                // Want to check the data-integrity but don't want to modify the
727                // XML
728                // for properties only
729                Element tmpProp = (Element) propElem.clone();
730                resolveTagContents("value", tmpProp, evalData);
731            }
732        }
733        resolveSLA(eAppXml, coordJob);
734        return eAppXml;
735    }
736
737    /**
738     * Resolve SLA events
739     *
740     * @param eAppXml job XML
741     * @param coordJob coordinator job bean
742     * @throws CommandException thrown if failed to resolve sla events
743     */
744    private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
745        Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace()));
746
747        if (eSla != null) {
748            String slaXml = XmlUtils.prettyPrint(eSla).toString();
749            try {
750                // EL evaluation
751                slaXml = evalSla.evaluate(slaXml, String.class);
752                // Validate against semantic SXD
753                XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
754            }
755            catch (Exception e) {
756                throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
757            }
758        }
759    }
760
761    /**
762     * Resolve input-events/data-in and output-events/data-out tags.
763     *
764     * @param eJob : Job element
765     * @throws CoordinatorJobException thrown if failed to resolve input and output events
766     */
767    @SuppressWarnings("unchecked")
768    private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
769        // Resolving input-events/data-in
770        // Clone the job and don't update anything in the original
771        Element eJob = (Element) eJobOrg.clone();
772        Element inputList = eJob.getChild("input-events", eJob.getNamespace());
773        if (inputList != null) {
774            TreeSet<String> eventNameSet = new TreeSet<String>();
775            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
776                String dataInName = dataIn.getAttributeValue("name");
777                dataNameList.put(dataInName, "data-in");
778                // check whether there is any duplicate data-in name
779                if (eventNameSet.contains(dataInName)) {
780                    throw new RuntimeException("Duplicate dataIn name " + dataInName);
781                }
782                else {
783                    eventNameSet.add(dataInName);
784                }
785                resolveTagContents("instance", dataIn, evalInst);
786                resolveTagContents("start-instance", dataIn, evalInst);
787                resolveTagContents("end-instance", dataIn, evalInst);
788            }
789        }
790        // Resolving output-events/data-out
791        Element outputList = eJob.getChild("output-events", eJob.getNamespace());
792        if (outputList != null) {
793            TreeSet<String> eventNameSet = new TreeSet<String>();
794            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
795                String dataOutName = dataOut.getAttributeValue("name");
796                dataNameList.put(dataOutName, "data-out");
797                // check whether there is any duplicate data-out name
798                if (eventNameSet.contains(dataOutName)) {
799                    throw new RuntimeException("Duplicate dataIn name " + dataOutName);
800                }
801                else {
802                    eventNameSet.add(dataOutName);
803                }
804                resolveTagContents("instance", dataOut, evalInst);
805            }
806        }
807
808    }
809
810    /**
811     * Add an attribute into XML element.
812     *
813     * @param attrName :attribute name
814     * @param elem : Element to add attribute
815     * @param value :Value of attribute
816     */
817    private void addAnAttribute(String attrName, Element elem, String value) {
818        elem.setAttribute(attrName, value);
819    }
820
821    /**
822     * Resolve datasets using job configuration.
823     *
824     * @param eAppXml : Job Element XML
825     * @throws Exception thrown if failed to resolve datasets
826     */
827    @SuppressWarnings("unchecked")
828    private void resolveDataSets(Element eAppXml) throws Exception {
829        Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
830        if (datasetList != null) {
831
832            List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
833            resolveDataSets(dsElems);
834            resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
835                    eAppXml.getNamespace()), evalNofuncs);
836        }
837    }
838
839    /**
840     * Resolve datasets using job configuration.
841     *
842     * @param dsElems : Data set XML element.
843     * @throws CoordinatorJobException thrown if failed to resolve datasets
844     */
845    private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
846        for (Element dsElem : dsElems) {
847            // Setting up default TimeUnit and EndOFDuraion
848            evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
849            evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
850
851            String val = resolveAttribute("frequency", dsElem, evalFreq);
852            int ival = ParamChecker.checkInteger(val, "frequency");
853            ParamChecker.checkGTZero(ival, "frequency");
854            addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
855                    .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
856            addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
857                    .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
858            val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
859            ParamChecker.checkDateOozieTZ(val, "initial-instance");
860            checkInitialInstance(val);
861            val = resolveAttribute("timezone", dsElem, evalNofuncs);
862            ParamChecker.checkTimeZone(val, "timezone");
863            resolveTagContents("uri-template", dsElem, evalNofuncs);
864            resolveTagContents("done-flag", dsElem, evalNofuncs);
865        }
866    }
867
868    /**
869     * Resolve the content of a tag.
870     *
871     * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
872     * @param elem : Element where the tag exists.
873     * @param eval : EL evealuator
874     * @return Resolved tag content.
875     * @throws CoordinatorJobException thrown if failed to resolve tag content
876     */
877    @SuppressWarnings("unchecked")
878    private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
879        String ret = "";
880        if (elem != null) {
881            for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
882                if (tagElem != null) {
883                    String updated;
884                    try {
885                        updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
886
887                    }
888                    catch (Exception e) {
889                        throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
890                    }
891                    tagElem.removeContent();
892                    tagElem.addContent(updated);
893                    ret += updated;
894                }
895            }
896        }
897        return ret;
898    }
899
900    /**
901     * Resolve an attribute value.
902     *
903     * @param attrName : Attribute name.
904     * @param elem : XML Element where attribute is defiend
905     * @param eval : ELEvaluator used to resolve
906     * @return Resolved attribute value
907     * @throws CoordinatorJobException thrown if failed to resolve an attribute value
908     */
909    private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
910        Attribute attr = elem.getAttribute(attrName);
911        String val = null;
912        if (attr != null) {
913            try {
914                val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
915            }
916            catch (Exception e) {
917                throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
918            }
919            attr.setValue(val);
920        }
921        return val;
922    }
923
924    /**
925     * Include referred datasets into XML.
926     *
927     * @param resolvedXml : Job XML element.
928     * @param conf : Job configuration
929     * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
930     */
931    @SuppressWarnings("unchecked")
932    protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
933        Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
934        Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
935        List<String> dsList = new ArrayList<String>();
936        if (datasets != null) {
937            for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
938                String incDSFile = includeElem.getTextTrim();
939                includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
940            }
941            for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
942                String dsName = e.getAttributeValue("name");
943                if (dsList.contains(dsName)) {// Override with this DS
944                    // Remove duplicate
945                    removeDataSet(allDataSets, dsName);
946                }
947                else {
948                    dsList.add(dsName);
949                }
950                allDataSets.addContent((Element) e.clone());
951            }
952        }
953        insertDataSet(resolvedXml, allDataSets);
954        resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
955    }
956
957    /**
958     * Include one dataset file.
959     *
960     * @param incDSFile : Include data set filename.
961     * @param dsList :List of dataset names to verify the duplicate.
962     * @param allDataSets : Element that includes all dataset definitions.
963     * @param dsNameSpace : Data set name space
964     * @throws CoordinatorJobException thrown if failed to include one dataset file
965     */
966    @SuppressWarnings("unchecked")
967    private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
968    throws CoordinatorJobException {
969        Element tmpDataSets = null;
970        try {
971            String dsXml = readDefinition(incDSFile);
972            LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
973            tmpDataSets = XmlUtils.parseXml(dsXml);
974        }
975        catch (JDOMException e) {
976            LOG.warn("Error parsing included dataset [{0}].  Message [{1}]", incDSFile, e.getMessage());
977            throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
978        }
979        resolveDataSets(tmpDataSets.getChildren("dataset"));
980        for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
981            String dsName = e.getAttributeValue("name");
982            if (dsList.contains(dsName)) {
983                throw new RuntimeException("Duplicate Dataset " + dsName);
984            }
985            dsList.add(dsName);
986            Element tmp = (Element) e.clone();
987            // TODO: Don't like to over-write the external/include DS's namespace
988            tmp.setNamespace(dsNameSpace);
989            tmp.getChild("uri-template").setNamespace(dsNameSpace);
990            if (e.getChild("done-flag") != null) {
991                tmp.getChild("done-flag").setNamespace(dsNameSpace);
992            }
993            allDataSets.addContent(tmp);
994        }
995        // nested include
996        for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
997            String incFile = includeElem.getTextTrim();
998            includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
999        }
1000    }
1001
1002    /**
1003     * Remove a dataset from a list of dataset.
1004     *
1005     * @param eDatasets : List of dataset
1006     * @param name : Dataset name to be removed.
1007     */
1008    @SuppressWarnings("unchecked")
1009    private static void removeDataSet(Element eDatasets, String name) {
1010        for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
1011            if (eDataset.getAttributeValue("name").equals(name)) {
1012                eDataset.detach();
1013                return;
1014            }
1015        }
1016        throw new RuntimeException("undefined dataset: " + name);
1017    }
1018
1019    /**
1020     * Read coordinator definition.
1021     *
1022     * @param appPath application path.
1023     * @return coordinator definition.
1024     * @throws CoordinatorJobException thrown if the definition could not be read.
1025     */
1026    protected String readDefinition(String appPath) throws CoordinatorJobException {
1027        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
1028        // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
1029        try {
1030            URI uri = new URI(appPath);
1031            LOG.debug("user =" + user);
1032            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1033            Configuration fsConf = has.createJobConf(uri.getAuthority());
1034            FileSystem fs = has.createFileSystem(user, uri, fsConf);
1035            Path appDefPath = null;
1036
1037            // app path could be a directory
1038            Path path = new Path(uri.getPath());
1039            // check file exists for dataset include file, app xml already checked
1040            if (!fs.exists(path)) {
1041                throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
1042            }
1043            if (!fs.isFile(path)) {
1044                appDefPath = new Path(path, COORDINATOR_XML_FILE);
1045            } else {
1046                appDefPath = path;
1047            }
1048
1049            Reader reader = new InputStreamReader(fs.open(appDefPath));
1050            StringWriter writer = new StringWriter();
1051            IOUtils.copyCharStream(reader, writer);
1052            return writer.toString();
1053        }
1054        catch (IOException ex) {
1055            LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
1056            throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1057        }
1058        catch (URISyntaxException ex) {
1059            LOG.warn("URISyException :" + ex.getMessage());
1060            throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
1061        }
1062        catch (HadoopAccessorException ex) {
1063            throw new CoordinatorJobException(ex);
1064        }
1065        catch (Exception ex) {
1066            LOG.warn("Exception :", ex);
1067            throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1068        }
1069    }
1070
1071    /**
1072     * Write a coordinator job into database
1073     *
1074     * @param eJob : XML element of job
1075     * @param coordJob : Coordinator job bean
1076     * @return Job id
1077     * @throws CommandException thrown if unable to save coordinator job to db
1078     */
1079    private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1080        String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1081        coordJob.setId(jobId);
1082
1083        coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1084        coordJob.setCreatedTime(new Date());
1085        coordJob.setUser(conf.get(OozieClient.USER_NAME));
1086        String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1087        coordJob.setGroup(group);
1088        coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1089        coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1090        coordJob.setLastActionNumber(0);
1091        coordJob.setLastModifiedTime(new Date());
1092
1093        if (!dryrun) {
1094            coordJob.setLastModifiedTime(new Date());
1095            try {
1096                jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
1097            }
1098            catch (JPAExecutorException jpaee) {
1099                coordJob.setId(null);
1100                coordJob.setStatus(CoordinatorJob.Status.FAILED);
1101                throw new CommandException(jpaee);
1102            }
1103        }
1104        return jobId;
1105    }
1106
1107    /*
1108     * this method checks if the initial-instance specified for a particular
1109       is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1110     */
1111    private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1112        Date initialInstance, givenInstance;
1113        try {
1114            initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1115            givenInstance = DateUtils.parseDateOozieTZ(val);
1116        }
1117        catch (Exception e) {
1118            throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val +
1119                                               "' to Date object. ",e);
1120        }
1121        if(givenInstance.compareTo(initialInstance) < 0) {
1122            throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val +
1123                    " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance));
1124        }
1125    }
1126
1127    /* (non-Javadoc)
1128     * @see org.apache.oozie.command.XCommand#getEntityKey()
1129     */
1130    @Override
1131    public String getEntityKey() {
1132        return null;
1133    }
1134
1135    /* (non-Javadoc)
1136     * @see org.apache.oozie.command.XCommand#isLockRequired()
1137     */
1138    @Override
1139    protected boolean isLockRequired() {
1140        return false;
1141    }
1142
1143    /* (non-Javadoc)
1144     * @see org.apache.oozie.command.XCommand#loadState()
1145     */
1146    @Override
1147    protected void loadState() throws CommandException {
1148        jpaService = Services.get().get(JPAService.class);
1149        if (jpaService == null) {
1150            throw new CommandException(ErrorCode.E0610);
1151        }
1152        coordJob = new CoordinatorJobBean();
1153        if (this.bundleId != null) {
1154            // this coord job is created from bundle
1155            coordJob.setBundleId(this.bundleId);
1156            // first use bundle id if submit thru bundle
1157            logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
1158            LogUtils.setLogInfo(logInfo);
1159        }
1160        if (this.coordName != null) {
1161            // this coord job is created from bundle
1162            coordJob.setAppName(this.coordName);
1163        }
1164        setJob(coordJob);
1165
1166    }
1167
1168    /* (non-Javadoc)
1169     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1170     */
1171    @Override
1172    protected void verifyPrecondition() throws CommandException {
1173
1174    }
1175
1176    /* (non-Javadoc)
1177     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1178     */
1179    @Override
1180    public void notifyParent() throws CommandException {
1181        // update bundle action
1182        if (coordJob.getBundleId() != null) {
1183            LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1184            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1185            bundleStatusUpdate.call();
1186        }
1187    }
1188
1189    /* (non-Javadoc)
1190     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1191     */
1192    @Override
1193    public void updateJob() throws CommandException {
1194    }
1195
1196    /* (non-Javadoc)
1197     * @see org.apache.oozie.command.TransitionXCommand#getJob()
1198     */
1199    @Override
1200    public Job getJob() {
1201        return coordJob;
1202    }
1203
1204    @Override
1205    public void performWrites() throws CommandException {
1206    }
1207}