001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.io.IOException;
021import java.io.InputStreamReader;
022import java.io.Reader;
023import java.io.StringReader;
024import java.io.StringWriter;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.Date;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import javax.xml.transform.stream.StreamSource;
034import javax.xml.validation.Validator;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.oozie.BundleJobBean;
040import org.apache.oozie.ErrorCode;
041import org.apache.oozie.client.Job;
042import org.apache.oozie.client.OozieClient;
043import org.apache.oozie.command.CommandException;
044import org.apache.oozie.command.PreconditionException;
045import org.apache.oozie.command.SubmitTransitionXCommand;
046import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
047import org.apache.oozie.service.HadoopAccessorException;
048import org.apache.oozie.service.HadoopAccessorService;
049import org.apache.oozie.service.JPAService;
050import org.apache.oozie.service.SchemaService;
051import org.apache.oozie.service.Services;
052import org.apache.oozie.service.UUIDService;
053import org.apache.oozie.service.SchemaService.SchemaName;
054import org.apache.oozie.service.UUIDService.ApplicationType;
055import org.apache.oozie.util.ConfigUtils;
056import org.apache.oozie.util.DateUtils;
057import org.apache.oozie.util.ELEvaluator;
058import org.apache.oozie.util.ELUtils;
059import org.apache.oozie.util.IOUtils;
060import org.apache.oozie.util.InstrumentUtils;
061import org.apache.oozie.util.LogUtils;
062import org.apache.oozie.util.ParamChecker;
063import org.apache.oozie.util.PropertiesUtils;
064import org.apache.oozie.util.XConfiguration;
065import org.apache.oozie.util.XmlUtils;
066import org.apache.oozie.util.ParameterVerifier;
067import org.jdom.Attribute;
068import org.jdom.Element;
069import org.jdom.JDOMException;
070import org.xml.sax.SAXException;
071
072/**
073 * This Command will submit the bundle.
074 */
075public class BundleSubmitXCommand extends SubmitTransitionXCommand {
076
077    private Configuration conf;
078    public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
079    public static final String BUNDLE_XML_FILE = "bundle.xml";
080    private final BundleJobBean bundleBean = new BundleJobBean();
081    private String jobId;
082    private JPAService jpaService = null;
083
084    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
085    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
086
087    static {
088        String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
089                PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
090                PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
091                PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
092                PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
093        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
094
095        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
096        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
097        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
098    }
099
100    /**
101     * Constructor to create the bundle submit command.
102     *
103     * @param conf configuration for bundle job
104     */
105    public BundleSubmitXCommand(Configuration conf) {
106        super("bundle_submit", "bundle_submit", 1);
107        this.conf = ParamChecker.notNull(conf, "conf");
108    }
109
110    /**
111     * Constructor to create the bundle submit command.
112     *
113     * @param dryrun true if dryrun is enable
114     * @param conf configuration for bundle job
115     */
116    public BundleSubmitXCommand(boolean dryrun, Configuration conf) {
117        this(conf);
118        this.dryrun = dryrun;
119    }
120
121    /* (non-Javadoc)
122     * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
123     */
124    @Override
125    protected String submit() throws CommandException {
126        LOG.info("STARTED Bundle Submit");
127        try {
128            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
129
130            ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml()));
131            
132            String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
133            // Resolving all variables in the job properties.
134            // This ensures the Hadoop Configuration semantics is preserved.
135            XConfiguration resolvedVarsConf = new XConfiguration();
136            for (Map.Entry<String, String> entry : conf) {
137                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
138            }
139            conf = resolvedVarsConf;
140
141            String resolvedJobXml = resolvedVars(jobXmlWithNoComment, conf);
142
143            //verify the uniqueness of coord names
144            verifyCoordNameUnique(resolvedJobXml);
145            this.jobId = storeToDB(bundleBean, resolvedJobXml);
146            LogUtils.setLogInfo(bundleBean, logInfo);
147
148            if (dryrun) {
149                Date startTime = bundleBean.getStartTime();
150                long startTimeMilli = startTime.getTime();
151                long endTimeMilli = startTimeMilli + (3600 * 1000);
152                Date jobEndTime = bundleBean.getEndTime();
153                Date endTime = new Date(endTimeMilli);
154                if (endTime.compareTo(jobEndTime) > 0) {
155                    endTime = jobEndTime;
156                }
157                jobId = bundleBean.getId();
158                LOG.info("[" + jobId + "]: Update status to PREP");
159                bundleBean.setStatus(Job.Status.PREP);
160                try {
161                    new XConfiguration(new StringReader(bundleBean.getConf()));
162                }
163                catch (IOException e1) {
164                    LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
165                }
166                String output = bundleBean.getJobXml() + System.getProperty("line.separator");
167                return output;
168            }
169            else {
170                if (bundleBean.getKickoffTime() == null) {
171                    // If there is no KickOffTime, default kickoff is NOW.
172                    LOG.debug("Since kickoff time is not defined for job id " + jobId
173                            + ". Queuing and BundleStartXCommand immediately after submission");
174                    queue(new BundleStartXCommand(jobId));
175                }
176            }
177        }
178        catch (Exception ex) {
179            throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
180        }
181        LOG.info("ENDED Bundle Submit");
182        return this.jobId;
183    }
184
185    /* (non-Javadoc)
186     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
187     */
188    @Override
189    public void notifyParent() throws CommandException {
190    }
191
192    /* (non-Javadoc)
193     * @see org.apache.oozie.command.XCommand#getEntityKey()
194     */
195    @Override
196    public String getEntityKey() {
197        return null;
198    }
199
200    /* (non-Javadoc)
201     * @see org.apache.oozie.command.XCommand#isLockRequired()
202     */
203    @Override
204    protected boolean isLockRequired() {
205        return false;
206    }
207
208    /* (non-Javadoc)
209     * @see org.apache.oozie.command.XCommand#loadState()
210     */
211    @Override
212    protected void loadState() throws CommandException {
213    }
214
215    /* (non-Javadoc)
216     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
217     */
218    @Override
219    protected void verifyPrecondition() throws CommandException, PreconditionException {
220    }
221
222    /* (non-Javadoc)
223     * @see org.apache.oozie.command.XCommand#eagerLoadState()
224     */
225    @Override
226    protected void eagerLoadState() throws CommandException {
227        super.eagerLoadState();
228        jpaService = Services.get().get(JPAService.class);
229        if (jpaService == null) {
230            throw new CommandException(ErrorCode.E0610);
231        }
232    }
233
234    /* (non-Javadoc)
235     * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
236     */
237    @Override
238    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
239        try {
240            super.eagerVerifyPrecondition();
241            mergeDefaultConfig();
242            String appXml = readAndValidateXml();
243            bundleBean.setOrigJobXml(appXml);
244            LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
245        }
246        catch (BundleJobException ex) {
247            LOG.warn("BundleJobException:  ", ex);
248            throw new CommandException(ex);
249        }
250        catch (IllegalArgumentException iex) {
251            LOG.warn("IllegalArgumentException:  ", iex);
252            throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex);
253        }
254        catch (Exception ex) {
255            LOG.warn("Exception:  ", ex);
256            throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
257        }
258    }
259
260    /**
261     * Merge default configuration with user-defined configuration.
262     *
263     * @throws CommandException thrown if failed to merge configuration
264     */
265    protected void mergeDefaultConfig() throws CommandException {
266        Path configDefault = null;
267        try {
268            String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
269            Path bundleAppPath = new Path(bundleAppPathStr);
270            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
271            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
272            Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority());
273            FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
274
275            // app path could be a directory
276            if (!fs.isFile(bundleAppPath)) {
277                configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
278            } else {
279                configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
280            }
281
282            if (fs.exists(configDefault)) {
283                Configuration defaultConf = new XConfiguration(fs.open(configDefault));
284                PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
285                XConfiguration.injectDefaults(defaultConf, conf);
286            }
287            else {
288                LOG.info("configDefault Doesn't exist " + configDefault);
289            }
290            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
291        }
292        catch (IOException e) {
293            throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
294                    + configDefault, e);
295        }
296        catch (HadoopAccessorException e) {
297            throw new CommandException(e);
298        }
299        LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
300    }
301
302    /**
303     * Read the application XML and validate against bundle Schema
304     *
305     * @return validated bundle XML
306     * @throws BundleJobException thrown if failed to read or validate xml
307     */
308    private String readAndValidateXml() throws BundleJobException {
309        String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
310        String bundleXml = readDefinition(appPath);
311        validateXml(bundleXml);
312        return bundleXml;
313    }
314
315    /**
316     * Read bundle definition.
317     *
318     * @param appPath application path.
319     * @param user user name.
320     * @param group group name.
321     * @return bundle definition.
322     * @throws BundleJobException thrown if the definition could not be read.
323     */
324    protected String readDefinition(String appPath) throws BundleJobException {
325        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
326        //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
327        try {
328            URI uri = new URI(appPath);
329            LOG.debug("user =" + user);
330            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
331            Configuration fsConf = has.createJobConf(uri.getAuthority());
332            FileSystem fs = has.createFileSystem(user, uri, fsConf);
333            Path appDefPath = null;
334
335            // app path could be a directory
336            Path path = new Path(uri.getPath());
337            if (!fs.isFile(path)) {
338                appDefPath = new Path(path, BUNDLE_XML_FILE);
339            } else {
340                appDefPath = path;
341            }
342
343            Reader reader = new InputStreamReader(fs.open(appDefPath));
344            StringWriter writer = new StringWriter();
345            IOUtils.copyCharStream(reader, writer);
346            return writer.toString();
347        }
348        catch (IOException ex) {
349            LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
350            throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
351        }
352        catch (URISyntaxException ex) {
353            LOG.warn("URISyException :" + ex.getMessage());
354            throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
355        }
356        catch (HadoopAccessorException ex) {
357            throw new BundleJobException(ex);
358        }
359        catch (Exception ex) {
360            LOG.warn("Exception :", ex);
361            throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
362        }
363    }
364
365    /**
366     * Validate against Bundle XSD file
367     *
368     * @param xmlContent input bundle xml
369     * @throws BundleJobException thrown if failed to validate xml
370     */
371    private void validateXml(String xmlContent) throws BundleJobException {
372        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
373        Validator validator = schema.newValidator();
374        try {
375            validator.validate(new StreamSource(new StringReader(xmlContent)));
376        }
377        catch (SAXException ex) {
378            LOG.warn("SAXException :", ex);
379            throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
380        }
381        catch (IOException ex) {
382            LOG.warn("IOException :", ex);
383            throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
384        }
385    }
386
387    /**
388     * Write a Bundle Job into database
389     *
390     * @param Bundle job bean
391     * @return job id
392     * @throws CommandException thrown if failed to store bundle job bean to db
393     */
394    private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
395        try {
396            jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
397
398            bundleJob.setId(jobId);
399            String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name");
400            name = ELUtils.resolveAppName(name, conf);
401            bundleJob.setAppName(name);
402            bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
403            // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
404            bundleJob.setCreatedTime(new Date());
405            bundleJob.setUser(conf.get(OozieClient.USER_NAME));
406            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
407            bundleJob.setGroup(group);
408            bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
409            bundleJob.setJobXml(resolvedJobXml);
410            Element jobElement = XmlUtils.parseXml(resolvedJobXml);
411            Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
412            if (controlsElement != null) {
413                Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
414                if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
415                    Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue());
416                    bundleJob.setKickoffTime(kickoffTime);
417                }
418            }
419            bundleJob.setLastModifiedTime(new Date());
420
421            if (!dryrun) {
422                jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob));
423            }
424        }
425        catch (Exception ex) {
426            throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
427        }
428        return jobId;
429    }
430
431    /* (non-Javadoc)
432     * @see org.apache.oozie.command.TransitionXCommand#getJob()
433     */
434    @Override
435    public Job getJob() {
436        return bundleBean;
437    }
438
439    /**
440     * Resolve job xml with conf
441     *
442     * @param bundleXml bundle job xml
443     * @param conf job configuration
444     * @return resolved job xml
445     * @throws BundleJobException thrown if failed to resolve variables
446     */
447    private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
448        try {
449            ELEvaluator eval = createEvaluator(conf);
450            return eval.evaluate(bundleXml, String.class);
451        }
452        catch (Exception e) {
453            throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
454        }
455    }
456
457    /**
458     * Create ELEvaluator
459     *
460     * @param conf job configuration
461     * @return ELEvaluator the evaluator for el function
462     * @throws BundleJobException thrown if failed to create evaluator
463     */
464    public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
465        ELEvaluator eval;
466        ELEvaluator.Context context;
467        try {
468            context = new ELEvaluator.Context();
469            eval = new ELEvaluator(context);
470            for (Map.Entry<String, String> entry : conf) {
471                eval.setVariable(entry.getKey(), entry.getValue());
472            }
473        }
474        catch (Exception e) {
475            throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
476        }
477        return eval;
478    }
479
480    /**
481     * Verify the uniqueness of coordinator names
482     *
483     * @param resolved job xml
484     * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
485     */
486    @SuppressWarnings("unchecked")
487    private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
488        Set<String> set = new HashSet<String>();
489        try {
490            Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
491            List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
492            for (Element elem : coordElems) {
493                Attribute name = elem.getAttribute("name");
494                if (name != null) {
495                    if (set.contains(name.getValue())) {
496                        throw new CommandException(ErrorCode.E1304, name);
497                    }
498                    set.add(name.getValue());
499                }
500                else {
501                    throw new CommandException(ErrorCode.E1305);
502                }
503            }
504        }
505        catch (JDOMException jex) {
506            throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
507        }
508
509        return null;
510    }
511
512    /* (non-Javadoc)
513     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
514     */
515    @Override
516    public void updateJob() throws CommandException {
517    }
518
519    @Override
520    public void performWrites() throws CommandException {
521    }
522}