001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.oozie;
019
020import org.apache.oozie.client.OozieClientException;
021import org.apache.oozie.action.ActionExecutor;
022import org.apache.oozie.action.ActionExecutorException;
023import org.apache.oozie.DagEngine;
024import org.apache.oozie.LocalOozieClient;
025import org.apache.oozie.WorkflowJobBean;
026import org.apache.oozie.service.DagEngineService;
027import org.apache.oozie.client.WorkflowAction;
028import org.apache.oozie.client.OozieClient;
029import org.apache.oozie.client.WorkflowJob;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.util.ConfigUtils;
032import org.apache.oozie.util.JobUtils;
033import org.apache.oozie.util.PropertiesUtils;
034import org.apache.oozie.util.XmlUtils;
035import org.apache.oozie.util.XConfiguration;
036import org.apache.oozie.util.XLog;
037import org.apache.oozie.service.Services;
038import org.apache.hadoop.conf.Configuration;
039import org.jdom.Element;
040import org.jdom.Namespace;
041
042import java.io.StringReader;
043import java.io.IOException;
044import java.util.Set;
045import java.util.HashSet;
046
047public class SubWorkflowActionExecutor extends ActionExecutor {
048    public static final String ACTION_TYPE = "sub-workflow";
049    public static final String LOCAL = "local";
050    public static final String PARENT_ID = "oozie.wf.parent.id";
051
052    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
053
054    static {
055        String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
056                PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
057                PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
058                PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
059
060        String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
061        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
062        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
063    }
064
065    protected SubWorkflowActionExecutor() {
066        super(ACTION_TYPE);
067    }
068
069    public void initActionType() {
070        super.initActionType();
071    }
072
073    protected OozieClient getWorkflowClient(Context context, String oozieUri) {
074        OozieClient oozieClient;
075        if (oozieUri.equals(LOCAL)) {
076            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
077            String user = workflow.getUser();
078            String group = workflow.getGroup();
079            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
080            oozieClient = new LocalOozieClient(dagEngine);
081        }
082        else {
083            // TODO we need to add authToken to the WC for the remote case
084            oozieClient = new OozieClient(oozieUri);
085        }
086        return oozieClient;
087    }
088
089    protected void injectInline(Element eConf, Configuration subWorkflowConf) throws IOException,
090            ActionExecutorException {
091        if (eConf != null) {
092            String strConf = XmlUtils.prettyPrint(eConf).toString();
093            Configuration conf = new XConfiguration(new StringReader(strConf));
094            try {
095                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_DEFAULT_PROPERTIES);
096            }
097            catch (CommandException ex) {
098                throw convertException(ex);
099            }
100            XConfiguration.copy(conf, subWorkflowConf);
101        }
102    }
103
104    @SuppressWarnings("unchecked")
105    protected void injectCallback(Context context, Configuration conf) {
106        String callback = context.getCallbackUrl("$status");
107        if (conf.get(OozieClient.WORKFLOW_NOTIFICATION_URL) != null) {
108            XLog.getLog(getClass())
109                    .warn("Sub-Workflow configuration has a custom job end notification URI, overriding");
110        }
111        conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, callback);
112    }
113
114    protected void injectRecovery(String externalId, Configuration conf) {
115        conf.set(OozieClient.EXTERNAL_ID, externalId);
116    }
117
118    protected void injectParent(String parentId, Configuration conf) {
119        conf.set(PARENT_ID, parentId);
120    }
121
122    protected String checkIfRunning(OozieClient oozieClient, String extId) throws OozieClientException {
123        String jobId = oozieClient.getJobId(extId);
124        if (jobId.equals("")) {
125            return null;
126        }
127        return jobId;
128    }
129
130    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
131        try {
132            Element eConf = XmlUtils.parseXml(action.getConf());
133            Namespace ns = eConf.getNamespace();
134            Element e = eConf.getChild("oozie", ns);
135            String oozieUri = (e == null) ? LOCAL : e.getTextTrim();
136            OozieClient oozieClient = getWorkflowClient(context, oozieUri);
137            String subWorkflowId = null;
138            String extId = context.getRecoveryId();
139            String runningJobId = null;
140            if (extId != null) {
141                runningJobId = checkIfRunning(oozieClient, extId);
142            }
143            if (runningJobId == null) {
144                String appPath = eConf.getChild("app-path", ns).getTextTrim();
145
146                XConfiguration subWorkflowConf = new XConfiguration();
147                Configuration parentConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
148                if (eConf.getChild(("propagate-configuration"), ns) != null) {
149                    XConfiguration.copy(parentConf, subWorkflowConf);
150                }
151
152                // the proto has the necessary credentials
153                Configuration protoActionConf = context.getProtoActionConf();
154                XConfiguration.copy(protoActionConf, subWorkflowConf);
155                subWorkflowConf.set(OozieClient.APP_PATH, appPath);
156                String group = ConfigUtils.getWithDeprecatedCheck(parentConf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
157                if(group != null) {
158                    subWorkflowConf.set(OozieClient.GROUP_NAME, group);
159                }
160                injectInline(eConf.getChild("configuration", ns), subWorkflowConf);
161                injectCallback(context, subWorkflowConf);
162                injectRecovery(extId, subWorkflowConf);
163                injectParent(context.getWorkflow().getId(), subWorkflowConf);
164
165                //TODO: this has to be refactored later to be done in a single place for REST calls and this
166                JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
167                                          subWorkflowConf);
168
169                subWorkflowId = oozieClient.run(subWorkflowConf.toProperties());
170            }
171            else {
172                subWorkflowId = runningJobId;
173            }
174            WorkflowJob workflow = oozieClient.getJobInfo(subWorkflowId);
175            String consoleUrl = workflow.getConsoleUrl();
176            context.setStartData(subWorkflowId, oozieUri, consoleUrl);
177            if (runningJobId != null) {
178                check(context, action);
179            }
180        }
181        catch (Exception ex) {
182            throw convertException(ex);
183        }
184    }
185
186    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
187        try {
188            String externalStatus = action.getExternalStatus();
189            WorkflowAction.Status status = externalStatus.equals("SUCCEEDED") ? WorkflowAction.Status.OK
190                                           : WorkflowAction.Status.ERROR;
191            context.setEndData(status, getActionSignal(status));
192        }
193        catch (Exception ex) {
194            throw convertException(ex);
195        }
196    }
197
198    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
199        try {
200            String subWorkflowId = action.getExternalId();
201            String oozieUri = action.getTrackerUri();
202            OozieClient oozieClient = getWorkflowClient(context, oozieUri);
203            WorkflowJob subWorkflow = oozieClient.getJobInfo(subWorkflowId);
204            WorkflowJob.Status status = subWorkflow.getStatus();
205            switch (status) {
206                case FAILED:
207                case KILLED:
208                case SUCCEEDED:
209                    context.setExecutionData(status.toString(), null);
210                    break;
211                default:
212                    context.setExternalStatus(status.toString());
213                    break;
214            }
215        }
216        catch (Exception ex) {
217            throw convertException(ex);
218        }
219    }
220
221    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
222        try {
223            String subWorkflowId = action.getExternalId();
224            String oozieUri = action.getTrackerUri();
225            OozieClient oozieClient = getWorkflowClient(context, oozieUri);
226            oozieClient.kill(subWorkflowId);
227            context.setEndData(WorkflowAction.Status.KILLED, getActionSignal(WorkflowAction.Status.KILLED));
228        }
229        catch (Exception ex) {
230            throw convertException(ex);
231        }
232    }
233
234    private static Set<String> FINAL_STATUS = new HashSet<String>();
235
236    static {
237        FINAL_STATUS.add("SUCCEEDED");
238        FINAL_STATUS.add("KILLED");
239        FINAL_STATUS.add("FAILED");
240    }
241
242    public boolean isCompleted(String externalStatus) {
243        return FINAL_STATUS.contains(externalStatus);
244    }
245}