001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.fs.Path;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.oozie.AppType;
024import org.apache.oozie.SLAEventBean;
025import org.apache.oozie.WorkflowJobBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
028import org.apache.oozie.service.HadoopAccessorException;
029import org.apache.oozie.service.JPAService;
030import org.apache.oozie.service.UUIDService;
031import org.apache.oozie.service.WorkflowStoreService;
032import org.apache.oozie.service.WorkflowAppService;
033import org.apache.oozie.service.HadoopAccessorService;
034import org.apache.oozie.service.Services;
035import org.apache.oozie.service.DagXLogInfoService;
036import org.apache.oozie.util.ConfigUtils;
037import org.apache.oozie.util.ELUtils;
038import org.apache.oozie.util.LogUtils;
039import org.apache.oozie.sla.SLAOperations;
040import org.apache.oozie.util.XLog;
041import org.apache.oozie.util.ParamChecker;
042import org.apache.oozie.util.XConfiguration;
043import org.apache.oozie.util.XmlUtils;
044import org.apache.oozie.command.CommandException;
045import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
046import org.apache.oozie.executor.jpa.JPAExecutorException;
047import org.apache.oozie.service.ELService;
048import org.apache.oozie.store.StoreException;
049import org.apache.oozie.workflow.WorkflowApp;
050import org.apache.oozie.workflow.WorkflowException;
051import org.apache.oozie.workflow.WorkflowInstance;
052import org.apache.oozie.workflow.WorkflowLib;
053import org.apache.oozie.util.ELEvaluator;
054import org.apache.oozie.util.InstrumentUtils;
055import org.apache.oozie.util.PropertiesUtils;
056import org.apache.oozie.util.db.SLADbOperations;
057import org.apache.oozie.service.SchemaService.SchemaName;
058import org.apache.oozie.client.OozieClient;
059import org.apache.oozie.client.WorkflowJob;
060import org.apache.oozie.client.SLAEvent.SlaAppType;
061import org.apache.oozie.client.rest.JsonBean;
062import org.jdom.Element;
063import java.util.ArrayList;
064import java.util.Date;
065import java.util.List;
066import java.util.Map;
067import java.util.Set;
068import java.util.HashSet;
069import java.io.IOException;
070import java.net.URI;
071
072@SuppressWarnings("deprecation")
073public class SubmitXCommand extends WorkflowXCommand<String> {
074    public static final String CONFIG_DEFAULT = "config-default.xml";
075
076    private Configuration conf;
077    private List<JsonBean> insertList = new ArrayList<JsonBean>();
078    private String parentId;
079
080    /**
081     * Constructor to create the workflow Submit Command.
082     *
083     * @param conf : Configuration for workflow job
084     */
085    public SubmitXCommand(Configuration conf) {
086        super("submit", "submit", 1);
087        this.conf = ParamChecker.notNull(conf, "conf");
088    }
089
090    /**
091     * Constructor for submitting wf through coordinator
092     *
093     * @param conf : Configuration for workflow job
094     * @param parentId: the coord action id
095     */
096    public SubmitXCommand(Configuration conf, String parentId) {
097        this(conf);
098        this.parentId = parentId;
099    }
100
101    /**
102     * Constructor to create the workflow Submit Command.
103     *
104     * @param dryrun : if dryrun
105     * @param conf : Configuration for workflow job
106     * @param authToken : To be used for authentication
107     */
108    public SubmitXCommand(boolean dryrun, Configuration conf) {
109        this(conf);
110        this.dryrun = dryrun;
111    }
112
113    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
114    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
115
116    static {
117        String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
118                PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
119                PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
120                PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
121        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
122
123        String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
124        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
125        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
126    }
127
128    @Override
129    protected String execute() throws CommandException {
130        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
131        WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
132        try {
133            XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
134            WorkflowApp app = wps.parseDef(conf);
135            XConfiguration protoActionConf = wps.createProtoActionConf(conf, true);
136            WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
137
138            String user = conf.get(OozieClient.USER_NAME);
139            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
140            URI uri = new URI(conf.get(OozieClient.APP_PATH));
141            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
142            Configuration fsConf = has.createJobConf(uri.getAuthority());
143            FileSystem fs = has.createFileSystem(user, uri, fsConf);
144
145            Path configDefault = null;
146            // app path could be a directory
147            Path path = new Path(uri.getPath());
148            if (!fs.isFile(path)) {
149                configDefault = new Path(path, CONFIG_DEFAULT);
150            } else {
151                configDefault = new Path(path.getParent(), CONFIG_DEFAULT);
152            }
153
154            if (fs.exists(configDefault)) {
155                try {
156                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
157                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
158                    XConfiguration.injectDefaults(defaultConf, conf);
159                }
160                catch (IOException ex) {
161                    throw new IOException("default configuration file, " + ex.getMessage(), ex);
162                }
163            }
164
165            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
166
167            // Resolving all variables in the job properties.
168            // This ensures the Hadoop Configuration semantics is preserved.
169            XConfiguration resolvedVarsConf = new XConfiguration();
170            for (Map.Entry<String, String> entry : conf) {
171                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
172            }
173            conf = resolvedVarsConf;
174
175            WorkflowInstance wfInstance;
176            try {
177                wfInstance = workflowLib.createInstance(app, conf);
178            }
179            catch (WorkflowException e) {
180                throw new StoreException(e);
181            }
182
183            Configuration conf = wfInstance.getConf();
184            // System.out.println("WF INSTANCE CONF:");
185            // System.out.println(XmlUtils.prettyPrint(conf).toString());
186
187            WorkflowJobBean workflow = new WorkflowJobBean();
188            workflow.setId(wfInstance.getId());
189            workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
190            workflow.setAppPath(conf.get(OozieClient.APP_PATH));
191            workflow.setConf(XmlUtils.prettyPrint(conf).toString());
192            workflow.setProtoActionConf(protoActionConf.toXmlString());
193            workflow.setCreatedTime(new Date());
194            workflow.setLastModifiedTime(new Date());
195            workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
196            workflow.setStatus(WorkflowJob.Status.PREP);
197            workflow.setRun(0);
198            workflow.setUser(conf.get(OozieClient.USER_NAME));
199            workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
200            workflow.setWorkflowInstance(wfInstance);
201            workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
202            // Set parent id if it doesn't already have one (for subworkflows)
203            if (workflow.getParentId() == null) {
204                workflow.setParentId(conf.get(SubWorkflowActionExecutor.PARENT_ID));
205            }
206            // Set to coord action Id if workflow submitted through coordinator
207            if (workflow.getParentId() == null) {
208                workflow.setParentId(parentId);
209            }
210
211            LogUtils.setLogInfo(workflow, logInfo);
212            LOG = XLog.resetPrefix(LOG);
213            LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus());
214            Element wfElem = XmlUtils.parseXml(app.getDefinition());
215            ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
216            String jobSlaXml = verifySlaElements(wfElem, evalSla);
217            if (!dryrun) {
218                writeSLARegistration(wfElem, jobSlaXml, workflow.getId(), workflow.getParentId(), workflow.getUser(),
219                        workflow.getGroup(), workflow.getAppName(), LOG, evalSla);
220                workflow.setSlaXml(jobSlaXml);
221                // System.out.println("SlaXml :"+ slaXml);
222
223                //store.insertWorkflow(workflow);
224                insertList.add(workflow);
225                JPAService jpaService = Services.get().get(JPAService.class);
226                if (jpaService != null) {
227                    try {
228                        jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
229                    }
230                    catch (JPAExecutorException je) {
231                        throw new CommandException(je);
232                    }
233                }
234                else {
235                    LOG.error(ErrorCode.E0610);
236                    return null;
237                }
238
239                return workflow.getId();
240            }
241            else {
242                return "OK";
243            }
244        }
245        catch (WorkflowException ex) {
246            throw new CommandException(ex);
247        }
248        catch (HadoopAccessorException ex) {
249            throw new CommandException(ex);
250        }
251        catch (Exception ex) {
252            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
253        }
254    }
255
256    private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
257        String jobSlaXml = "";
258        // Validate WF job
259        Element eSla = XmlUtils.getSLAElement(eWfJob);
260        if (eSla != null) {
261            jobSlaXml = resolveSla(eSla, evalSla);
262        }
263
264        // Validate all actions
265        for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
266            eSla = XmlUtils.getSLAElement(eWfJob);
267            if (eSla != null) {
268                resolveSla(eSla, evalSla);
269            }
270        }
271        return jobSlaXml;
272    }
273
274    private void writeSLARegistration(Element eWfJob, String slaXml, String jobId, String parentId, String user,
275            String group, String appName, XLog log, ELEvaluator evalSla) throws CommandException {
276        try {
277            if (slaXml != null && slaXml.length() > 0) {
278                Element eSla = XmlUtils.parseXml(slaXml);
279                SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, jobId,
280                        SlaAppType.WORKFLOW_JOB, user, group, log);
281                if(slaEvent != null) {
282                    insertList.add(slaEvent);
283                }
284                // insert into new table
285                SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName,
286                        log, false);
287            }
288            // Add sla for wf actions
289            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
290                Element actionSla = XmlUtils.getSLAElement(action);
291                if (actionSla != null) {
292                    String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
293                    actionSla = XmlUtils.parseXml(actionSlaXml);
294                    String actionId = Services.get().get(UUIDService.class)
295                            .generateChildId(jobId, action.getAttributeValue("name") + "");
296                    SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION,
297                            user, appName, log, false);
298                }
299            }
300        }
301        catch (Exception e) {
302            e.printStackTrace();
303            throw new CommandException(ErrorCode.E1007, "workflow " + jobId, e.getMessage(), e);
304        }
305    }
306
307    /**
308     * Resolve variables in sla xml element.
309     *
310     * @param eSla sla xml element
311     * @param evalSla sla evaluator
312     * @return sla xml string after evaluation
313     * @throws CommandException
314     */
315    public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
316        // EL evaluation
317        String slaXml = XmlUtils.prettyPrint(eSla).toString();
318        try {
319            slaXml = XmlUtils.removeComments(slaXml);
320            slaXml = evalSla.evaluate(slaXml, String.class);
321            XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
322            return slaXml;
323        }
324        catch (Exception e) {
325            throw new CommandException(ErrorCode.E1004, "Validation error :" + e.getMessage(), e);
326        }
327    }
328
329    /**
330     * Create an EL evaluator for a given group.
331     *
332     * @param conf configuration variable
333     * @param group group variable
334     * @return the evaluator created for the group
335     */
336    public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
337        ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
338        for (Map.Entry<String, String> entry : conf) {
339            eval.setVariable(entry.getKey(), entry.getValue());
340        }
341        return eval;
342    }
343
344    @Override
345    public String getEntityKey() {
346        return null;
347    }
348
349    @Override
350    protected boolean isLockRequired() {
351        return false;
352    }
353
354    @Override
355    protected void loadState() {
356
357    }
358
359    @Override
360    protected void verifyPrecondition() throws CommandException {
361
362    }
363
364}