001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Date;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.oozie.AppType;
036import org.apache.oozie.ErrorCode;
037import org.apache.oozie.WorkflowActionBean;
038import org.apache.oozie.WorkflowJobBean;
039import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
040import org.apache.oozie.client.OozieClient;
041import org.apache.oozie.client.WorkflowAction;
042import org.apache.oozie.client.WorkflowJob;
043import org.apache.oozie.client.rest.JsonBean;
044import org.apache.oozie.command.CommandException;
045import org.apache.oozie.command.PreconditionException;
046import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
047import org.apache.oozie.executor.jpa.JPAExecutorException;
048import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
049import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
050import org.apache.oozie.service.DagXLogInfoService;
051import org.apache.oozie.service.HadoopAccessorException;
052import org.apache.oozie.service.HadoopAccessorService;
053import org.apache.oozie.service.JPAService;
054import org.apache.oozie.service.Services;
055import org.apache.oozie.service.UUIDService;
056import org.apache.oozie.service.WorkflowAppService;
057import org.apache.oozie.service.WorkflowStoreService;
058import org.apache.oozie.sla.SLAOperations;
059import org.apache.oozie.sla.service.SLAService;
060import org.apache.oozie.util.ConfigUtils;
061import org.apache.oozie.util.ELEvaluator;
062import org.apache.oozie.util.ELUtils;
063import org.apache.oozie.util.InstrumentUtils;
064import org.apache.oozie.util.LogUtils;
065import org.apache.oozie.util.ParamChecker;
066import org.apache.oozie.util.PropertiesUtils;
067import org.apache.oozie.util.XConfiguration;
068import org.apache.oozie.util.XLog;
069import org.apache.oozie.util.XmlUtils;
070import org.apache.oozie.workflow.WorkflowApp;
071import org.apache.oozie.workflow.WorkflowException;
072import org.apache.oozie.workflow.WorkflowInstance;
073import org.apache.oozie.workflow.WorkflowLib;
074import org.apache.oozie.workflow.lite.NodeHandler;
075import org.jdom.Element;
076import org.jdom.JDOMException;
077
078/**
079 * This is a RerunXCommand which is used for rerunn.
080 *
081 */
082public class ReRunXCommand extends WorkflowXCommand<Void> {
083    private final String jobId;
084    private Configuration conf;
085    private final Set<String> nodesToSkip = new HashSet<String>();
086    public static final String TO_SKIP = "TO_SKIP";
087    private WorkflowJobBean wfBean;
088    private List<WorkflowActionBean> actions;
089    private JPAService jpaService;
090    private List<JsonBean> updateList = new ArrayList<JsonBean>();
091    private List<JsonBean> deleteList = new ArrayList<JsonBean>();
092
093    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
094    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
095
096    static {
097        String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
098                PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
099                PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
100                PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
101        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
102
103        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
104        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
105        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
106    }
107
108    public ReRunXCommand(String jobId, Configuration conf) {
109        super("rerun", "rerun", 1);
110        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
111        this.conf = ParamChecker.notNull(conf, "conf");
112    }
113
114    /* (non-Javadoc)
115     * @see org.apache.oozie.command.XCommand#execute()
116     */
117    @Override
118    protected Void execute() throws CommandException {
119        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
120        LogUtils.setLogInfo(wfBean, logInfo);
121        WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance();
122        WorkflowInstance newWfInstance;
123        String appPath = null;
124
125        WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
126        try {
127            XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
128            WorkflowApp app = wps.parseDef(conf);
129            XConfiguration protoActionConf = wps.createProtoActionConf(conf, true);
130            WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
131
132            appPath = conf.get(OozieClient.APP_PATH);
133            URI uri = new URI(appPath);
134            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
135            Configuration fsConf = has.createJobConf(uri.getAuthority());
136            FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf);
137
138            Path configDefault = null;
139            // app path could be a directory
140            Path path = new Path(uri.getPath());
141            if (!fs.isFile(path)) {
142                configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT);
143            } else {
144                configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT);
145            }
146
147            if (fs.exists(configDefault)) {
148                Configuration defaultConf = new XConfiguration(fs.open(configDefault));
149                PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
150                XConfiguration.injectDefaults(defaultConf, conf);
151            }
152
153            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
154
155            // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are preserved.
156            // The Configuration.get function within XConfiguration.resolve() works recursively to get the final value corresponding to a key in the map
157            // Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream
158            conf = ((XConfiguration) conf).resolve();
159
160            try {
161                newWfInstance = workflowLib.createInstance(app, conf, jobId);
162            }
163            catch (WorkflowException e) {
164                throw new CommandException(e);
165            }
166            String appName = ELUtils.resolveAppName(app.getName(), conf);
167            if (SLAService.isEnabled()) {
168                Element wfElem = XmlUtils.parseXml(app.getDefinition());
169                ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
170                Element eSla = XmlUtils.getSLAElement(wfElem);
171                String jobSlaXml = null;
172                if (eSla != null) {
173                    jobSlaXml = SubmitXCommand.resolveSla(eSla, evalSla);
174                }
175                writeSLARegistration(wfElem, jobSlaXml, newWfInstance.getId(),
176                            conf.get(SubWorkflowActionExecutor.PARENT_ID), conf.get(OozieClient.USER_NAME), appName,
177                            evalSla);
178            }
179            wfBean.setAppName(appName);
180            wfBean.setProtoActionConf(protoActionConf.toXmlString());
181        }
182        catch (WorkflowException ex) {
183            throw new CommandException(ex);
184        }
185        catch (IOException ex) {
186            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
187        }
188        catch (HadoopAccessorException ex) {
189            throw new CommandException(ex);
190        }
191        catch (URISyntaxException ex) {
192            throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
193        }
194        catch (Exception ex) {
195            throw new CommandException(ErrorCode.E1007, ex.getMessage(), ex);
196        }
197
198        for (int i = 0; i < actions.size(); i++) {
199            if (!nodesToSkip.contains(actions.get(i).getName())) {
200                deleteList.add(actions.get(i));
201                LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
202            }
203            else {
204                copyActionData(newWfInstance, oldWfInstance);
205            }
206        }
207
208        wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
209        wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
210        wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
211        wfBean.setUser(conf.get(OozieClient.USER_NAME));
212        String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
213        wfBean.setGroup(group);
214        wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
215        wfBean.setEndTime(null);
216        wfBean.setRun(wfBean.getRun() + 1);
217        wfBean.setStatus(WorkflowJob.Status.PREP);
218        wfBean.setWorkflowInstance(newWfInstance);
219
220        try {
221            wfBean.setLastModifiedTime(new Date());
222            updateList.add(wfBean);
223            // call JPAExecutor to do the bulk writes
224            jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true));
225        }
226        catch (JPAExecutorException je) {
227            throw new CommandException(je);
228        }
229
230        return null;
231    }
232
233
234    @SuppressWarnings("unchecked")
235        private void writeSLARegistration(Element wfElem, String jobSlaXml, String id, String parentId, String user,
236            String appName, ELEvaluator evalSla) throws JDOMException, CommandException {
237        if (jobSlaXml != null && jobSlaXml.length() > 0) {
238            Element eSla = XmlUtils.parseXml(jobSlaXml);
239            // insert into new table
240            SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, LOG,
241                    true);
242        }
243        // Add sla for wf actions
244        for (Element action : (List<Element>) wfElem.getChildren("action", wfElem.getNamespace())) {
245            Element actionSla = XmlUtils.getSLAElement(action);
246            if (actionSla != null) {
247                String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
248                actionSla = XmlUtils.parseXml(actionSlaXml);
249                if (!nodesToSkip.contains(action.getAttributeValue("name"))) {
250                    String actionId = Services.get().get(UUIDService.class)
251                            .generateChildId(jobId, action.getAttributeValue("name") + "");
252                    SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, user,
253                            appName, LOG, true);
254                }
255            }
256        }
257
258    }
259
260    /**
261     * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
262     * skipped node list
263     *
264     * @throws CommandException
265     */
266    @Override
267    protected void eagerLoadState() throws CommandException {
268        super.eagerLoadState();
269        try {
270            jpaService = Services.get().get(JPAService.class);
271            if (jpaService != null) {
272                this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId));
273                this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId));
274            }
275            else {
276                throw new CommandException(ErrorCode.E0610);
277            }
278
279            if (conf != null) {
280                if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes
281                    Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
282                    for (String str : skipNodes) {
283                        // trimming is required
284                        nodesToSkip.add(str.trim());
285                    }
286                    LOG.debug("Skipnode size :" + nodesToSkip.size());
287                }
288                else {
289                    for (WorkflowActionBean action : actions) { // Rerun from failed nodes
290                        if (action.getStatus() == WorkflowAction.Status.OK) {
291                            nodesToSkip.add(action.getName());
292                        }
293                    }
294                    LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size());
295                }
296                StringBuilder tmp = new StringBuilder();
297                for (String node : nodesToSkip) {
298                    tmp.append(node).append(",");
299                }
300                LOG.debug("SkipNode List :" + tmp);
301            }
302        }
303        catch (Exception ex) {
304            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
305        }
306    }
307
308    /**
309     * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
310     * The nodes that are to be skipped are to be completed successfully in the base run.
311     *
312     * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions
313     */
314    @Override
315    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
316        super.eagerVerifyPrecondition();
317        if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
318                || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
319                        WorkflowJob.Status.SUCCEEDED))) {
320            throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
321        }
322        Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
323        for (WorkflowActionBean action : actions) {
324            if (nodesToSkip.contains(action.getName())) {
325                if (!action.getStatus().equals(WorkflowAction.Status.OK)
326                        && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
327                    throw new CommandException(ErrorCode.E0806, action.getName());
328                }
329                unmachedNodes.remove(action.getName());
330            }
331        }
332        if (unmachedNodes.size() > 0) {
333            StringBuilder sb = new StringBuilder();
334            String separator = "";
335            for (String s : unmachedNodes) {
336                sb.append(separator).append(s);
337                separator = ",";
338            }
339            throw new CommandException(ErrorCode.E0807, sb);
340        }
341    }
342
343    /**
344     * Copys the variables for skipped nodes from the old wfInstance to new one.
345     *
346     * @param newWfInstance : Source WF instance object
347     * @param oldWfInstance : Update WF instance
348     */
349    private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
350        Map<String, String> oldVars = new HashMap<String, String>();
351        Map<String, String> newVars = new HashMap<String, String>();
352        oldVars = oldWfInstance.getAllVars();
353        for (String var : oldVars.keySet()) {
354            String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
355            if (nodesToSkip.contains(actionName)) {
356                newVars.put(var, oldVars.get(var));
357            }
358        }
359        for (String node : nodesToSkip) {
360            // Setting the TO_SKIP variable to true. This will be used by
361            // SignalCommand and LiteNodeHandler to skip the action.
362            newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
363            String visitedFlag = NodeHandler.getLoopFlag(node);
364            // Removing the visited flag so that the action won't be considered
365            // a loop.
366            if (newVars.containsKey(visitedFlag)) {
367                newVars.remove(visitedFlag);
368            }
369        }
370        newWfInstance.setAllVars(newVars);
371    }
372
373    /* (non-Javadoc)
374     * @see org.apache.oozie.command.XCommand#getEntityKey()
375     */
376    @Override
377    public String getEntityKey() {
378        return this.jobId;
379    }
380
381    /* (non-Javadoc)
382     * @see org.apache.oozie.command.XCommand#isLockRequired()
383     */
384    @Override
385    protected boolean isLockRequired() {
386        return true;
387    }
388
389    /* (non-Javadoc)
390     * @see org.apache.oozie.command.XCommand#loadState()
391     */
392    @Override
393    protected void loadState() throws CommandException {
394        eagerLoadState();
395    }
396
397    /* (non-Javadoc)
398     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
399     */
400    @Override
401    protected void verifyPrecondition() throws CommandException, PreconditionException {
402        eagerVerifyPrecondition();
403    }
404}