001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.oozie.WorkflowJobBean;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.service.JPAService;
024import org.apache.oozie.service.WorkflowStoreService;
025import org.apache.oozie.service.WorkflowAppService;
026import org.apache.oozie.service.Services;
027import org.apache.oozie.service.DagXLogInfoService;
028import org.apache.oozie.util.InstrumentUtils;
029import org.apache.oozie.util.LogUtils;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.util.ParamChecker;
032import org.apache.oozie.util.XConfiguration;
033import org.apache.oozie.util.XmlUtils;
034import org.apache.oozie.command.CommandException;
035import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
036import org.apache.oozie.store.StoreException;
037import org.apache.oozie.workflow.WorkflowApp;
038import org.apache.oozie.workflow.WorkflowException;
039import org.apache.oozie.workflow.WorkflowInstance;
040import org.apache.oozie.workflow.WorkflowLib;
041import org.apache.oozie.util.PropertiesUtils;
042import org.apache.oozie.client.OozieClient;
043import org.apache.oozie.client.WorkflowJob;
044import org.apache.oozie.client.XOozieClient;
045import org.jdom.Element;
046import org.jdom.Namespace;
047
048import java.util.Date;
049import java.util.Map;
050import java.util.Set;
051import java.util.HashSet;
052
053public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> {
054
055    protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>();
056    protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>();
057
058    static {
059        MANDATORY_OOZIE_CONFS.add(XOozieClient.JT);
060        MANDATORY_OOZIE_CONFS.add(XOozieClient.NN);
061        MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH);
062
063        OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES);
064        OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES);
065    }
066
067    private Configuration conf;
068
069    public SubmitHttpXCommand(String name, String type, Configuration conf) {
070        super(name, type, 1);
071        this.conf = ParamChecker.notNull(conf, "conf");
072    }
073
074    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
075    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
076
077    static {
078        String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
079                PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
080                PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
081                PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
082        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
083
084        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
085        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
086        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
087    }
088
089    /**
090     * Generate workflow xml from conf object
091     *
092     * @param conf the configuration object
093     * @return workflow xml def string representation
094     */
095    abstract protected String getWorkflowXml(Configuration conf);
096
097    /* (non-Javadoc)
098     * @see org.apache.oozie.command.XCommand#execute()
099     */
100    @Override
101    protected String execute() throws CommandException {
102        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
103        WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
104        try {
105            XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
106            String wfXml = getWorkflowXml(conf);
107            LOG.debug("workflow xml created on the server side is :\n");
108            LOG.debug(wfXml);
109            WorkflowApp app = wps.parseDef(wfXml, conf);
110            XConfiguration protoActionConf = wps.createProtoActionConf(conf, false);
111            WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
112
113            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
114
115            // Resolving all variables in the job properties.
116            // This ensures the Hadoop Configuration semantics is preserved.
117            XConfiguration resolvedVarsConf = new XConfiguration();
118            for (Map.Entry<String, String> entry : conf) {
119                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
120            }
121            conf = resolvedVarsConf;
122
123            WorkflowInstance wfInstance;
124            try {
125                wfInstance = workflowLib.createInstance(app, conf);
126            }
127            catch (WorkflowException e) {
128                throw new StoreException(e);
129            }
130
131            Configuration conf = wfInstance.getConf();
132
133            WorkflowJobBean workflow = new WorkflowJobBean();
134            workflow.setId(wfInstance.getId());
135            workflow.setAppName(app.getName());
136            workflow.setAppPath(conf.get(OozieClient.APP_PATH));
137            workflow.setConf(XmlUtils.prettyPrint(conf).toString());
138            workflow.setProtoActionConf(protoActionConf.toXmlString());
139            workflow.setCreatedTime(new Date());
140            workflow.setLastModifiedTime(new Date());
141            workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
142            workflow.setStatus(WorkflowJob.Status.PREP);
143            workflow.setRun(0);
144            workflow.setUser(conf.get(OozieClient.USER_NAME));
145            workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
146            workflow.setWorkflowInstance(wfInstance);
147            workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
148
149            LogUtils.setLogInfo(workflow, logInfo);
150            JPAService jpaService = Services.get().get(JPAService.class);
151            if (jpaService != null) {
152                jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow));
153            }
154            else {
155                LOG.error(ErrorCode.E0610);
156                return null;
157            }
158
159            return workflow.getId();
160        }
161        catch (WorkflowException ex) {
162            throw new CommandException(ex);
163        }
164        catch (Exception ex) {
165            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
166        }
167    }
168
169    static private void addSection(Element X, Namespace ns, String filesStr, String tagName) {
170        if (filesStr != null) {
171            String[] files = filesStr.split(",");
172            for (String f : files) {
173                Element tagElement = new Element(tagName, ns);
174                if (f.contains("#")) {
175                    tagElement.addContent(f);
176                }
177                else {
178                    String filename = f.substring(f.lastIndexOf("/") + 1, f.length());
179                    if (filename == null || filename.isEmpty()) {
180                        tagElement.addContent(f);
181                    }
182                    else {
183                        tagElement.addContent(f + "#" + filename);
184                    }
185                }
186                X.addContent(tagElement);
187            }
188        }
189    }
190
191    /**
192     * Add file section in X.
193     *
194     * @param parent XML element to be appended
195     * @param conf Configuration object
196     * @param ns XML element namespace
197     */
198    static void addFileSection(Element X, Configuration conf, Namespace ns) {
199        String filesStr = conf.get(XOozieClient.FILES);
200        addSection(X, ns, filesStr, "file");
201    }
202
203    /**
204     * Add archive section in X.
205     *
206     * @param parent XML element to be appended
207     * @param conf Configuration object
208     * @param ns XML element namespace
209     */
210    static void addArchiveSection(Element X, Configuration conf, Namespace ns) {
211        String archivesStr = conf.get(XOozieClient.ARCHIVES);
212        addSection(X, ns, archivesStr, "archive");
213    }
214}