001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.BufferedReader;
021import java.io.ByteArrayOutputStream;
022import java.io.File;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.InputStreamReader;
027import java.io.PrintStream;
028import java.io.StringReader;
029import java.net.ConnectException;
030import java.net.URI;
031import java.net.URISyntaxException;
032import java.net.UnknownHostException;
033import java.util.ArrayList;
034import java.util.HashMap;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Map;
039import java.util.Properties;
040import java.util.Set;
041import java.util.Map.Entry;
042
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.filecache.DistributedCache;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.fs.permission.AccessControlException;
049import org.apache.hadoop.mapred.JobClient;
050import org.apache.hadoop.mapred.JobConf;
051import org.apache.hadoop.mapred.JobID;
052import org.apache.hadoop.mapred.RunningJob;
053import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
054import org.apache.hadoop.util.DiskChecker;
055import org.apache.oozie.WorkflowActionBean;
056import org.apache.oozie.WorkflowJobBean;
057import org.apache.oozie.action.ActionExecutor;
058import org.apache.oozie.action.ActionExecutorException;
059import org.apache.oozie.client.OozieClient;
060import org.apache.oozie.client.WorkflowAction;
061import org.apache.oozie.service.HadoopAccessorException;
062import org.apache.oozie.service.HadoopAccessorService;
063import org.apache.oozie.service.Services;
064import org.apache.oozie.service.URIHandlerService;
065import org.apache.oozie.service.WorkflowAppService;
066import org.apache.oozie.servlet.CallbackServlet;
067import org.apache.oozie.util.ELEvaluator;
068import org.apache.oozie.util.IOUtils;
069import org.apache.oozie.util.PropertiesUtils;
070import org.apache.oozie.util.XConfiguration;
071import org.apache.oozie.util.XLog;
072import org.apache.oozie.util.XmlUtils;
073import org.jdom.Element;
074import org.jdom.JDOMException;
075import org.jdom.Namespace;
076import org.apache.hadoop.security.token.Token;
077import org.apache.hadoop.security.token.TokenIdentifier;
078
079public class JavaActionExecutor extends ActionExecutor {
080
081    private static final String HADOOP_USER = "user.name";
082    public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
083    public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
084    public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
085    public static final String HADOOP_NAME_NODE = "fs.default.name";
086    private static final String HADOOP_JOB_NAME = "mapred.job.name";
087    public static final String OOZIE_COMMON_LIBDIR = "oozie";
088    public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
089    private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
090    public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
091    public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
092    public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
093    private static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
094    public static final String OOZIE_ACTION_SHIP_LAUNCHER_JAR = "oozie.action.ship.launcher.jar";
095    private boolean useLauncherJar;
096    private static int maxActionOutputLen;
097    private static int maxExternalStatsSize;
098
099    private static final String SUCCEEDED = "SUCCEEDED";
100    private static final String KILLED = "KILLED";
101    private static final String FAILED = "FAILED";
102    private static final String FAILED_KILLED = "FAILED/KILLED";
103    private static final String RUNNING = "RUNNING";
104    protected XLog log = XLog.getLog(getClass());
105
106    static {
107        DISALLOWED_PROPERTIES.add(HADOOP_USER);
108        DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
109        DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
110        DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
111        DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
112    }
113
114    public JavaActionExecutor() {
115        this("java");
116    }
117
118    protected JavaActionExecutor(String type) {
119        super(type);
120        requiresNNJT = true;
121        useLauncherJar = getOozieConf().getBoolean(OOZIE_ACTION_SHIP_LAUNCHER_JAR, true);
122    }
123
124    protected String getLauncherJarName() {
125        return getType() + "-launcher.jar";
126    }
127
128    protected List<Class> getLauncherClasses() {
129        List<Class> classes = new ArrayList<Class>();
130        classes.add(LauncherMapper.class);
131        classes.add(LauncherSecurityManager.class);
132        classes.add(LauncherException.class);
133        classes.add(LauncherMainException.class);
134        classes.add(PrepareActionsDriver.class);
135        classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
136        classes.add(ActionStats.class);
137        classes.add(ActionType.class);
138        return classes;
139    }
140
141    @Override
142    public void initActionType() {
143        XLog log = XLog.getLog(getClass());
144        super.initActionType();
145        maxActionOutputLen = getOozieConf()
146          .getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
147          // TODO: Remove the below config get in a subsequent release..
148          // This other irrelevant property is only used to
149          // preserve backwards compatibility cause of a typo.
150          // See OOZIE-4.
151          getOozieConf().getInt(CallbackServlet.CONF_MAX_DATA_LEN,
152            2 * 1024));
153        //Get the limit for the maximum allowed size of action stats
154        maxExternalStatsSize = getOozieConf().getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE, MAX_EXTERNAL_STATS_SIZE_DEFAULT);
155        maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
156
157        createLauncherJar();
158
159        registerError(UnknownHostException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA001");
160        registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.NON_TRANSIENT,
161                "JA002");
162        registerError(DiskChecker.DiskOutOfSpaceException.class.getName(),
163                ActionExecutorException.ErrorType.NON_TRANSIENT, "JA003");
164        registerError(org.apache.hadoop.hdfs.protocol.QuotaExceededException.class.getName(),
165                ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
166        registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
167                ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
168        registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "  JA006");
169        registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
170        registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
171        registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
172    }
173
174    public void createLauncherJar() {
175        if (useLauncherJar) {
176            try {
177                List<Class> classes = getLauncherClasses();
178                Class[] launcherClasses = classes.toArray(new Class[classes.size()]);
179                IOUtils.createJar(new File(getOozieRuntimeDir()), getLauncherJarName(), launcherClasses);
180            }
181            catch (IOException ex) {
182                throw new RuntimeException(ex);
183            }
184            catch (java.lang.NoClassDefFoundError err) {
185                ByteArrayOutputStream baos = new ByteArrayOutputStream();
186                err.printStackTrace(new PrintStream(baos));
187                log.warn(baos.toString());
188            }
189         }
190    }
191
192    /**
193     * Get the maximum allowed size of stats
194     *
195     * @return maximum size of stats
196     */
197    public static int getMaxExternalStatsSize() {
198        return maxExternalStatsSize;
199    }
200
201    static void checkForDisallowedProps(Configuration conf, String confName) throws ActionExecutorException {
202        for (String prop : DISALLOWED_PROPERTIES) {
203            if (conf.get(prop) != null) {
204                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA010",
205                        "Property [{0}] not allowed in action [{1}] configuration", prop, confName);
206            }
207        }
208    }
209
210    public JobConf createBaseHadoopConf(Context context, Element actionXml) {
211        Namespace ns = actionXml.getNamespace();
212        String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
213        String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
214        JobConf conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
215        conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
216        conf.set(HADOOP_JOB_TRACKER, jobTracker);
217        conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
218        conf.set(HADOOP_YARN_RM, jobTracker);
219        conf.set(HADOOP_NAME_NODE, nameNode);
220        conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
221        return conf;
222    }
223
224    private void injectLauncherProperties(Configuration srcConf, Configuration launcherConf) {
225        for (Map.Entry<String, String> entry : srcConf) {
226            if (entry.getKey().startsWith("oozie.launcher.")) {
227                String name = entry.getKey().substring("oozie.launcher.".length());
228                String value = entry.getValue();
229                // setting original KEY
230                launcherConf.set(entry.getKey(), value);
231                // setting un-prefixed key (to allow Hadoop job config
232                // for the launcher job
233                launcherConf.set(name, value);
234            }
235        }
236    }
237
238    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
239            throws ActionExecutorException {
240        try {
241            Namespace ns = actionXml.getNamespace();
242            Element e = actionXml.getChild("configuration", ns);
243            if (e != null) {
244                String strConf = XmlUtils.prettyPrint(e).toString();
245                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
246
247                XConfiguration launcherConf = new XConfiguration();
248                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
249                XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
250                injectLauncherProperties(actionDefaultConf, launcherConf);
251                injectLauncherProperties(inlineConf, launcherConf);
252                injectLauncherUseUberMode(launcherConf);
253                checkForDisallowedProps(launcherConf, "launcher configuration");
254                XConfiguration.copy(launcherConf, conf);
255            }
256            return conf;
257        }
258        catch (IOException ex) {
259            throw convertException(ex);
260        }
261    }
262
263    void injectLauncherUseUberMode(Configuration launcherConf) {
264        // Set Uber Mode for the launcher (YARN only, ignored by MR1) if not set by action conf and not disabled in oozie-site
265        if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
266            if (getOozieConf().getBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable", false)) {
267                launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
268            }
269        }
270    }
271
272    public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
273            throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
274        Namespace ns = element.getNamespace();
275        Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
276        while (it.hasNext()) {
277            Element e = it.next();
278            String jobXml = e.getTextTrim();
279            Path path = new Path(appPath, jobXml);
280            FileSystem fs = context.getAppFileSystem();
281            Configuration jobXmlConf = new XConfiguration(fs.open(path));
282            checkForDisallowedProps(jobXmlConf, "job-xml");
283            XConfiguration.copy(jobXmlConf, conf);
284        }
285        Element e = element.getChild("configuration", ns);
286        if (e != null) {
287            String strConf = XmlUtils.prettyPrint(e).toString();
288            XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
289            checkForDisallowedProps(inlineConf, "inline configuration");
290            XConfiguration.copy(inlineConf, conf);
291        }
292    }
293
294    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
295            throws ActionExecutorException {
296        try {
297            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
298            XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
299            XConfiguration.injectDefaults(actionDefaults, actionConf);
300
301            has.checkSupportedFilesystem(appPath.toUri());
302
303            parseJobXmlAndConfiguration(context, actionXml, appPath, actionConf);
304            return actionConf;
305        }
306        catch (IOException ex) {
307            throw convertException(ex);
308        }
309        catch (HadoopAccessorException ex) {
310            throw convertException(ex);
311        }
312        catch (URISyntaxException ex) {
313            throw convertException(ex);
314        }
315    }
316
317    Configuration addToCache(Configuration conf, Path appPath, String filePath, boolean archive)
318            throws ActionExecutorException {
319
320        URI uri = null;
321        try {
322            uri = new URI(filePath);
323            URI baseUri = appPath.toUri();
324            if (uri.getScheme() == null) {
325                String resolvedPath = uri.getPath();
326                if (!resolvedPath.startsWith("/")) {
327                    resolvedPath = baseUri.getPath() + "/" + resolvedPath;
328                }
329                uri = new URI(baseUri.getScheme(), baseUri.getAuthority(), resolvedPath, uri.getQuery(), uri.getFragment());
330            }
331            if (archive) {
332                DistributedCache.addCacheArchive(uri.normalize(), conf);
333            }
334            else {
335                String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
336                if (fileName.endsWith(".so") || fileName.contains(".so.")) { // .so files
337                    uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
338                    DistributedCache.addCacheFile(uri.normalize(), conf);
339                }
340                else if (fileName.endsWith(".jar")) { // .jar files
341                    if (!fileName.contains("#")) {
342                        String user = conf.get("user.name");
343                        Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, new Path(uri.normalize()), conf);
344                    }
345                    else {
346                        DistributedCache.addCacheFile(uri, conf);
347                    }
348                }
349                else { // regular files
350                    if (!fileName.contains("#")) {
351                        uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), fileName);
352                    }
353                    DistributedCache.addCacheFile(uri.normalize(), conf);
354                }
355            }
356            DistributedCache.createSymlink(conf);
357            return conf;
358        }
359        catch (Exception ex) {
360            XLog.getLog(getClass()).debug(
361                    "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf="
362                            + XmlUtils.prettyPrint(conf).toString());
363            throw convertException(ex);
364        }
365    }
366
367    String getOozieLauncherJar(Context context) throws ActionExecutorException {
368        try {
369            return new Path(context.getActionDir(), getLauncherJarName()).toString();
370        }
371        catch (Exception ex) {
372            throw convertException(ex);
373        }
374    }
375
376    public void prepareActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
377        try {
378            Path actionDir = context.getActionDir();
379            Path tempActionDir = new Path(actionDir.getParent(), actionDir.getName() + ".tmp");
380            if (!actionFs.exists(actionDir)) {
381                try {
382                    if (useLauncherJar) {
383                        actionFs.copyFromLocalFile(new Path(getOozieRuntimeDir(), getLauncherJarName()), new Path(
384                                tempActionDir, getLauncherJarName()));
385                    }
386                    else {
387                        actionFs.mkdirs(tempActionDir);
388                    }
389                    actionFs.rename(tempActionDir, actionDir);
390                }
391                catch (IOException ex) {
392                    actionFs.delete(tempActionDir, true);
393                    actionFs.delete(actionDir, true);
394                    throw ex;
395                }
396            }
397        }
398        catch (Exception ex) {
399            throw convertException(ex);
400        }
401    }
402
403    void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
404        try {
405            Path actionDir = context.getActionDir();
406            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
407                    && actionFs.exists(actionDir)) {
408                actionFs.delete(actionDir, true);
409            }
410        }
411        catch (Exception ex) {
412            throw convertException(ex);
413        }
414    }
415
416    protected void addShareLib(Path appPath, Configuration conf, String[] actionShareLibNames)
417            throws ActionExecutorException {
418        if (actionShareLibNames != null) {
419            for (String actionShareLibName : actionShareLibNames) {
420                try {
421                    Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
422                    if (systemLibPath != null) {
423                        Path actionLibPath = new Path(systemLibPath, actionShareLibName.trim());
424                        String user = conf.get("user.name");
425                        FileSystem fs;
426                        // If the actionLibPath has a valid scheme and authority, then use them to
427                        // determine the filesystem that the sharelib resides on; otherwise, assume
428                        // it resides on the same filesystem as the appPath and use the appPath to
429                        // determine the filesystem
430                        if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) {
431                            fs = Services.get().get(HadoopAccessorService.class)
432                                    .createFileSystem(user, actionLibPath.toUri(), conf);
433                        }
434                        else {
435                            fs = Services.get().get(HadoopAccessorService.class)
436                                    .createFileSystem(user, appPath.toUri(), conf);
437                        }
438                        if (fs.exists(actionLibPath)) {
439                            FileStatus[] files = fs.listStatus(actionLibPath);
440                            for (FileStatus file : files) {
441                                addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false);
442                            }
443                        }
444                    }
445                }
446                catch (HadoopAccessorException ex) {
447                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, ex.getErrorCode()
448                            .toString(), ex.getMessage());
449                }
450                catch (IOException ex) {
451                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
452                            "It should never happen", ex.getMessage());
453                }
454            }
455        }
456    }
457
458    protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException {
459        String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath");
460        if (actionLibsStrArr != null) {
461            try {
462                for (String actionLibsStr : actionLibsStrArr) {
463                    actionLibsStr = actionLibsStr.trim();
464                    if (actionLibsStr.length() > 0)
465                    {
466                        Path actionLibsPath = new Path(actionLibsStr);
467                        String user = conf.get("user.name");
468                        FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
469                        if (fs.exists(actionLibsPath)) {
470                            FileStatus[] files = fs.listStatus(actionLibsPath);
471                            for (FileStatus file : files) {
472                                addToCache(conf, appPath, file.getPath().toUri().getPath(), false);
473                            }
474                        }
475                    }
476                }
477            }
478            catch (HadoopAccessorException ex){
479                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
480                        ex.getErrorCode().toString(), ex.getMessage());
481            }
482            catch (IOException ex){
483                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
484                        "It should never happen", ex.getMessage());
485            }
486        }
487    }
488
489    @SuppressWarnings("unchecked")
490    void setLibFilesArchives(Context context, Element actionXml, Path appPath, Configuration conf)
491            throws ActionExecutorException {
492        Configuration proto = context.getProtoActionConf();
493
494        // launcher JAR
495        if (useLauncherJar) {
496            addToCache(conf, appPath, getOozieLauncherJar(context), false);
497        }
498
499        // Workflow lib/
500        String[] paths = proto.getStrings(WorkflowAppService.APP_LIB_PATH_LIST);
501        if (paths != null) {
502            for (String path : paths) {
503                addToCache(conf, appPath, path, false);
504            }
505        }
506
507        // Action libs
508        addActionLibs(appPath, conf);
509
510        // files and archives defined in the action
511        for (Element eProp : (List<Element>) actionXml.getChildren()) {
512            if (eProp.getName().equals("file")) {
513                String[] filePaths = eProp.getTextTrim().split(",");
514                for (String path : filePaths) {
515                    addToCache(conf, appPath, path.trim(), false);
516                }
517            }
518            else if (eProp.getName().equals("archive")) {
519                String[] archivePaths = eProp.getTextTrim().split(",");
520                for (String path : archivePaths){
521                    addToCache(conf, appPath, path.trim(), true);
522                }
523            }
524        }
525
526        addAllShareLibs(appPath, conf, context, actionXml);
527        }
528
529    // Adds action specific share libs and common share libs
530    private void addAllShareLibs(Path appPath, Configuration conf, Context context, Element actionXml)
531            throws ActionExecutorException {
532        // Add action specific share libs
533        addActionShareLib(appPath, conf, context, actionXml);
534        // Add common sharelibs for Oozie
535        addShareLib(appPath, conf, new String[]{JavaActionExecutor.OOZIE_COMMON_LIBDIR});
536    }
537
538    private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
539            throws ActionExecutorException {
540        XConfiguration wfJobConf = null;
541        try {
542            wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
543        }
544        catch (IOException ioe) {
545            throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
546                    ioe.getMessage());
547        }
548        // Action sharelibs are only added if user has specified to use system libpath
549        if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
550            // add action specific sharelibs
551            addShareLib(appPath, conf, getShareLibNames(context, actionXml, conf));
552        }
553    }
554
555
556    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
557        Namespace ns = actionXml.getNamespace();
558        Element e = actionXml.getChild("main-class", ns);
559        return e.getTextTrim();
560    }
561
562    private static final String QUEUE_NAME = "mapred.job.queue.name";
563
564    private static final Set<String> SPECIAL_PROPERTIES = new HashSet<String>();
565
566    static {
567        SPECIAL_PROPERTIES.add(QUEUE_NAME);
568        SPECIAL_PROPERTIES.add(ACL_VIEW_JOB);
569        SPECIAL_PROPERTIES.add(ACL_MODIFY_JOB);
570    }
571
572    @SuppressWarnings("unchecked")
573    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
574            throws ActionExecutorException {
575        try {
576
577            // app path could be a file
578            Path appPathRoot = new Path(context.getWorkflow().getAppPath());
579            if (actionFs.isFile(appPathRoot)) {
580                appPathRoot = appPathRoot.getParent();
581            }
582
583            // launcher job configuration
584            JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
585            setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
586
587            String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
588            if (actionShareLibProperty != null) {
589                launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
590            }
591            setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
592
593            String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
594            if (jobName == null || jobName.isEmpty()) {
595                jobName = XLog.format(
596                        "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
597                        context.getWorkflow().getAppName(), action.getName(),
598                        context.getWorkflow().getId());
599            launcherJobConf.setJobName(jobName);
600            }
601
602            String jobId = context.getWorkflow().getId();
603            String actionId = action.getId();
604            Path actionDir = context.getActionDir();
605            String recoveryId = context.getRecoveryId();
606
607            // Getting the prepare XML from the action XML
608            Namespace ns = actionXml.getNamespace();
609            Element prepareElement = actionXml.getChild("prepare", ns);
610            String prepareXML = "";
611            if (prepareElement != null) {
612                if (prepareElement.getChildren().size() > 0) {
613                    prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
614                }
615            }
616            LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
617                    prepareXML);
618
619            LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
620            LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
621            LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
622            LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
623
624            List<Element> list = actionXml.getChildren("arg", ns);
625            String[] args = new String[list.size()];
626            for (int i = 0; i < list.size(); i++) {
627                args[i] = list.get(i).getTextTrim();
628            }
629            LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
630
631            List<Element> javaopts = actionXml.getChildren("java-opt", ns);
632            for (Element opt: javaopts) {
633                String opts = launcherJobConf.get("mapred.child.java.opts", "");
634                opts = opts + " " + opt.getTextTrim();
635                opts = opts.trim();
636                launcherJobConf.set("mapred.child.java.opts", opts);
637            }
638
639            Element opt = actionXml.getChild("java-opts", ns);
640            if (opt != null) {
641                String opts = launcherJobConf.get("mapred.child.java.opts", "");
642                opts = opts + " " + opt.getTextTrim();
643                opts = opts.trim();
644                launcherJobConf.set("mapred.child.java.opts", opts);
645            }
646
647            // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
648            // maybe we should add queue to the WF schema, below job-tracker
649            actionConfToLauncherConf(actionConf, launcherJobConf);
650
651            // to disable cancelation of delegation token on launcher job end
652            launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
653
654            return launcherJobConf;
655        }
656        catch (Exception ex) {
657            throw convertException(ex);
658        }
659    }
660
661    private void injectCallback(Context context, Configuration conf) {
662        String callback = context.getCallbackUrl("$jobStatus");
663        if (conf.get("job.end.notification.url") != null) {
664            XLog.getLog(getClass()).warn("Overriding the action job end notification URI");
665        }
666        conf.set("job.end.notification.url", callback);
667    }
668
669    void injectActionCallback(Context context, Configuration actionConf) {
670        injectCallback(context, actionConf);
671    }
672
673    void injectLauncherCallback(Context context, Configuration launcherConf) {
674        injectCallback(context, launcherConf);
675    }
676
677    private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
678        for (String name : SPECIAL_PROPERTIES) {
679            if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
680                launcherConf.set(name, actionConf.get(name));
681            }
682        }
683    }
684
685    public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
686        JobClient jobClient = null;
687        boolean exception = false;
688        try {
689            Path appPathRoot = new Path(context.getWorkflow().getAppPath());
690
691            // app path could be a file
692            if (actionFs.isFile(appPathRoot)) {
693                appPathRoot = appPathRoot.getParent();
694            }
695
696            Element actionXml = XmlUtils.parseXml(action.getConf());
697
698            // action job configuration
699            Configuration actionConf = createBaseHadoopConf(context, actionXml);
700            setupActionConf(actionConf, context, actionXml, appPathRoot);
701            XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
702            setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
703
704            String jobName = actionConf.get(HADOOP_JOB_NAME);
705            if (jobName == null || jobName.isEmpty()) {
706                jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
707                        getType(), context.getWorkflow().getAppName(),
708                        action.getName(), context.getWorkflow().getId());
709                actionConf.set(HADOOP_JOB_NAME, jobName);
710            }
711
712            injectActionCallback(context, actionConf);
713
714            if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
715                // ONLY in the case where user has not given the
716                // modify-job ACL specifically
717                if (context.getWorkflow().getAcl() != null) {
718                    // setting the group owning the Oozie job to allow anybody in that
719                    // group to modify the jobs.
720                    actionConf.set(ACL_MODIFY_JOB, context.getWorkflow().getAcl());
721                }
722            }
723
724            // Setting the credential properties in launcher conf
725            HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
726                    action, actionConf);
727
728            // Adding if action need to set more credential tokens
729            JobConf credentialsConf = new JobConf(false);
730            XConfiguration.copy(actionConf, credentialsConf);
731            setCredentialTokens(credentialsConf, context, action, credentialsProperties);
732
733            // insert conf to action conf from credentialsConf
734            for (Entry<String, String> entry : credentialsConf) {
735                if (actionConf.get(entry.getKey()) == null) {
736                    actionConf.set(entry.getKey(), entry.getValue());
737                }
738            }
739
740            JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
741            injectLauncherCallback(context, launcherJobConf);
742            XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
743            jobClient = createJobClient(context, launcherJobConf);
744            String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
745                    .getRecoveryId());
746            boolean alreadyRunning = launcherId != null;
747            RunningJob runningJob;
748
749            // if user-retry is on, always submit new launcher
750            boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
751
752            if (alreadyRunning && !isUserRetry) {
753                runningJob = jobClient.getJob(JobID.forName(launcherId));
754                if (runningJob == null) {
755                    String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
756                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
757                            "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
758                }
759            }
760            else {
761                XLog.getLog(getClass()).debug("Submitting the job through Job Client for action " + action.getId());
762
763                // setting up propagation of the delegation token.
764                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
765                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has
766                        .getMRDelegationTokenRenewer(launcherJobConf));
767                launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
768
769                // insert credentials tokens to launcher job conf if needed
770                if (needInjectCredentials()) {
771                    for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
772                        log.debug("ADDING TOKEN: " + tk.getKind().toString());
773                        launcherJobConf.getCredentials().addToken(tk.getKind(), tk);
774                    }
775                }
776                else {
777                    log.info("No need to inject credentials.");
778                }
779                runningJob = jobClient.submitJob(launcherJobConf);
780                if (runningJob == null) {
781                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
782                            "Error submitting launcher for action [{0}]", action.getId());
783                }
784                launcherId = runningJob.getID().toString();
785                XLog.getLog(getClass()).debug("After submission get the launcherId " + launcherId);
786            }
787
788            String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
789            String consoleUrl = runningJob.getTrackingURL();
790            context.setStartData(launcherId, jobTracker, consoleUrl);
791        }
792        catch (Exception ex) {
793            exception = true;
794            throw convertException(ex);
795        }
796        finally {
797            if (jobClient != null) {
798                try {
799                    jobClient.close();
800                }
801                catch (Exception e) {
802                    if (exception) {
803                        log.error("JobClient error: ", e);
804                    }
805                    else {
806                        throw convertException(e);
807                    }
808                }
809            }
810        }
811    }
812
813    private boolean needInjectCredentials() {
814        boolean methodExists = true;
815
816        Class klass;
817        try {
818            klass = Class.forName("org.apache.hadoop.mapred.JobConf");
819            klass.getMethod("getCredentials");
820        }
821        catch (ClassNotFoundException ex) {
822            methodExists = false;
823        }
824        catch (NoSuchMethodException ex) {
825            methodExists = false;
826        }
827
828        return methodExists;
829    }
830
831    protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
832            WorkflowAction action, Configuration actionConf) throws Exception {
833        HashMap<String, CredentialsProperties> credPropertiesMap = null;
834        if (context != null && action != null) {
835            credPropertiesMap = getActionCredentialsProperties(context, action);
836            if (credPropertiesMap != null) {
837                for (String key : credPropertiesMap.keySet()) {
838                    CredentialsProperties prop = credPropertiesMap.get(key);
839                    if (prop != null) {
840                        log.debug("Credential Properties set for action : " + action.getId());
841                        for (String property : prop.getProperties().keySet()) {
842                            actionConf.set(property, prop.getProperties().get(property));
843                            log.debug("property : '" + property + "', value : '" + prop.getProperties().get(property) + "'");
844                        }
845                    }
846                }
847            }
848            else {
849                log.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
850            }
851        }
852        else {
853            log.warn("context or action is null");
854        }
855        return credPropertiesMap;
856    }
857
858    protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
859            HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
860
861        if (context != null && action != null && credPropertiesMap != null) {
862            for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
863                String credName = entry.getKey();
864                CredentialsProperties credProps = entry.getValue();
865                if (credProps != null) {
866                    CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
867                    Credentials credentialObject = credProvider.createCredentialObject();
868                    if (credentialObject != null) {
869                        credentialObject.addtoJobConf(jobconf, credProps, context);
870                        log.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
871                    }
872                    else {
873                        log.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType());
874                    }
875                }
876            }
877        }
878
879    }
880
881    protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
882            WorkflowAction action) throws Exception {
883        HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>();
884        if (context != null && action != null) {
885            String credsInAction = action.getCred();
886            log.debug("Get credential '" + credsInAction + "' properties for action : " + action.getId());
887            String[] credNames = credsInAction.split(",");
888            for (String credName : credNames) {
889                CredentialsProperties credProps = getCredProperties(context, credName);
890                props.put(credName, credProps);
891            }
892        }
893        else {
894            log.warn("context or action is null");
895        }
896        return props;
897    }
898
899    @SuppressWarnings("unchecked")
900    protected CredentialsProperties getCredProperties(Context context, String credName)
901            throws Exception {
902        CredentialsProperties credProp = null;
903        String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
904        XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
905        Element elementJob = XmlUtils.parseXml(workflowXml);
906        Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
907        if (credentials != null) {
908            for (Element credential : (List<Element>) credentials.getChildren("credential", credentials.getNamespace())) {
909                String name = credential.getAttributeValue("name");
910                String type = credential.getAttributeValue("type");
911                log.debug("getCredProperties: Name: " + name + ", Type: " + type);
912                if (name.equalsIgnoreCase(credName)) {
913                    credProp = new CredentialsProperties(name, type);
914                    for (Element property : (List<Element>) credential.getChildren("property",
915                            credential.getNamespace())) {
916                        String propertyName = property.getChildText("name", property.getNamespace());
917                        String propertyValue = property.getChildText("value", property.getNamespace());
918                        ELEvaluator eval = new ELEvaluator();
919                        for (Map.Entry<String, String> entry : wfjobConf) {
920                            eval.setVariable(entry.getKey(), entry.getValue().trim());
921                        }
922                        propertyName = eval.evaluate(propertyName, String.class);
923                        propertyValue = eval.evaluate(propertyValue, String.class);
924
925                        credProp.getProperties().put(propertyName, propertyValue);
926                        log.debug("getCredProperties: Properties name :'" + propertyName + "', Value : '"
927                                + propertyValue + "'");
928                    }
929                }
930            }
931        } else {
932            log.warn("credentials is null for the action");
933        }
934        return credProp;
935    }
936
937    @Override
938    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
939        try {
940            XLog.getLog(getClass()).debug("Starting action " + action.getId() + " getting Action File System");
941            FileSystem actionFs = context.getAppFileSystem();
942            XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
943            prepareActionDir(actionFs, context);
944            XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
945            submitLauncher(actionFs, context, action);
946            XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
947            check(context, action);
948            XLog.getLog(getClass()).debug("Action check is done after submission");
949        }
950        catch (Exception ex) {
951            throw convertException(ex);
952        }
953    }
954
955    @Override
956    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
957        try {
958            String externalStatus = action.getExternalStatus();
959            WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ? WorkflowAction.Status.OK
960                    : WorkflowAction.Status.ERROR;
961            context.setEndData(status, getActionSignal(status));
962        }
963        catch (Exception ex) {
964            throw convertException(ex);
965        }
966        finally {
967            try {
968                FileSystem actionFs = context.getAppFileSystem();
969                cleanUpActionDir(actionFs, context);
970            }
971            catch (Exception ex) {
972                throw convertException(ex);
973            }
974        }
975    }
976
977    /**
978     * Create job client object
979     *
980     * @param context
981     * @param jobConf
982     * @return
983     * @throws HadoopAccessorException
984     */
985    protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
986        String user = context.getWorkflow().getUser();
987        String group = context.getWorkflow().getGroup();
988        return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
989    }
990
991    protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
992        RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
993        return runningJob;
994    }
995
996    @Override
997    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
998        JobClient jobClient = null;
999        boolean exception = false;
1000        try {
1001            Element actionXml = XmlUtils.parseXml(action.getConf());
1002            FileSystem actionFs = context.getAppFileSystem();
1003            JobConf jobConf = createBaseHadoopConf(context, actionXml);
1004            jobClient = createJobClient(context, jobConf);
1005            RunningJob runningJob = getRunningJob(context, action, jobClient);
1006            if (runningJob == null) {
1007                context.setExternalStatus(FAILED);
1008                context.setExecutionData(FAILED, null);
1009                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1010                        "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
1011                                .getExternalId(), action.getId());
1012            }
1013            if (runningJob.isComplete()) {
1014                Path actionDir = context.getActionDir();
1015
1016                String user = context.getWorkflow().getUser();
1017                String group = context.getWorkflow().getGroup();
1018                if (LauncherMapperHelper.hasIdSwap(runningJob, user, group, actionDir)) {
1019                    String launcherId = action.getExternalId();
1020                    Path idSwapPath = LauncherMapperHelper.getIdSwapPath(context.getActionDir());
1021                    InputStream is = actionFs.open(idSwapPath);
1022                    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1023                    Properties props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1024                    reader.close();
1025                    String newId = props.getProperty("id");
1026                    runningJob = jobClient.getJob(JobID.forName(newId));
1027                    if (runningJob == null) {
1028                        context.setExternalStatus(FAILED);
1029                        throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
1030                                "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
1031                                action.getId());
1032                    }
1033
1034                    context.setExternalChildIDs(newId);
1035                    XLog.getLog(getClass()).info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
1036                            newId);
1037                }
1038                if (runningJob.isComplete()) {
1039                    XLog.getLog(getClass()).info(XLog.STD, "action completed, external ID [{0}]",
1040                            action.getExternalId());
1041                    if (runningJob.isSuccessful() && LauncherMapperHelper.isMainSuccessful(runningJob)) {
1042                        getActionData(actionFs, runningJob, action, context);
1043                        XLog.getLog(getClass()).info(XLog.STD, "action produced output");
1044                    }
1045                    else {
1046                        XLog log = XLog.getLog(getClass());
1047                        String errorReason;
1048                        Path actionError = LauncherMapperHelper.getErrorPath(context.getActionDir());
1049                        if (actionFs.exists(actionError)) {
1050                            InputStream is = actionFs.open(actionError);
1051                            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1052                            Properties props = PropertiesUtils.readProperties(reader, -1);
1053                            reader.close();
1054                            String errorCode = props.getProperty("error.code");
1055                            if (errorCode.equals("0")) {
1056                                errorCode = "JA018";
1057                            }
1058                            if (errorCode.equals("-1")) {
1059                                errorCode = "JA019";
1060                            }
1061                            errorReason = props.getProperty("error.reason");
1062                            log.warn("Launcher ERROR, reason: {0}", errorReason);
1063                            String exMsg = props.getProperty("exception.message");
1064                            String errorInfo = (exMsg != null) ? exMsg : errorReason;
1065                            context.setErrorInfo(errorCode, errorInfo);
1066                            String exStackTrace = props.getProperty("exception.stacktrace");
1067                            if (exMsg != null) {
1068                                log.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
1069                            }
1070                        }
1071                        else {
1072                            errorReason = XLog.format("LauncherMapper died, check Hadoop log for job [{0}:{1}]", action
1073                                    .getTrackerUri(), action.getExternalId());
1074                            log.warn(errorReason);
1075                        }
1076                        context.setExecutionData(FAILED_KILLED, null);
1077                        setActionCompletionData(context, actionFs);
1078                    }
1079                }
1080                else {
1081                    context.setExternalStatus(RUNNING);
1082                    XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1083                            action.getExternalId(), action.getExternalStatus());
1084                }
1085            }
1086            else {
1087                context.setExternalStatus(RUNNING);
1088                XLog.getLog(getClass()).info(XLog.STD, "checking action, external ID [{0}] status [{1}]",
1089                        action.getExternalId(), action.getExternalStatus());
1090            }
1091        }
1092        catch (Exception ex) {
1093            XLog.getLog(getClass()).warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
1094            exception = true;
1095            throw convertException(ex);
1096        }
1097        finally {
1098            if (jobClient != null) {
1099                try {
1100                    jobClient.close();
1101                }
1102                catch (Exception e) {
1103                    if (exception) {
1104                        log.error("JobClient error: ", e);
1105                    }
1106                    else {
1107                        throw convertException(e);
1108                    }
1109                }
1110            }
1111        }
1112    }
1113
1114    /**
1115     * Get the output data of an action. Subclasses should override this method
1116     * to get action specific output data.
1117     *
1118     * @param actionFs the FileSystem object
1119     * @param runningJob the runningJob
1120     * @param action the Workflow action
1121     * @param context executor context
1122     *
1123     */
1124    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
1125            throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
1126        Properties props = null;
1127        if (getCaptureOutput(action)) {
1128            props = new Properties();
1129            if (LauncherMapperHelper.hasOutputData(runningJob)) {
1130                Path actionOutput = LauncherMapperHelper.getOutputDataPath(context.getActionDir());
1131                InputStream is = actionFs.open(actionOutput);
1132                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
1133                props = PropertiesUtils.readProperties(reader, maxActionOutputLen);
1134                reader.close();
1135            }
1136        }
1137        context.setExecutionData(SUCCEEDED, props);
1138    }
1139
1140    protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
1141        Element eConf = XmlUtils.parseXml(action.getConf());
1142        Namespace ns = eConf.getNamespace();
1143        Element captureOutput = eConf.getChild("capture-output", ns);
1144        return captureOutput != null;
1145    }
1146
1147    @Override
1148    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
1149        JobClient jobClient = null;
1150        boolean exception = false;
1151        try {
1152            Element actionXml = XmlUtils.parseXml(action.getConf());
1153            JobConf jobConf = createBaseHadoopConf(context, actionXml);
1154            jobClient = createJobClient(context, jobConf);
1155            RunningJob runningJob = getRunningJob(context, action, jobClient);
1156            if (runningJob != null) {
1157                runningJob.killJob();
1158            }
1159            context.setExternalStatus(KILLED);
1160            context.setExecutionData(KILLED, null);
1161        }
1162        catch (Exception ex) {
1163            exception = true;
1164            throw convertException(ex);
1165        }
1166        finally {
1167            try {
1168                FileSystem actionFs = context.getAppFileSystem();
1169                cleanUpActionDir(actionFs, context);
1170                if (jobClient != null) {
1171                    jobClient.close();
1172                }
1173            }
1174            catch (Exception ex) {
1175                if (exception) {
1176                    log.error("Error: ", ex);
1177                }
1178                else {
1179                    throw convertException(ex);
1180                }
1181            }
1182        }
1183    }
1184
1185    private static Set<String> FINAL_STATUS = new HashSet<String>();
1186
1187    static {
1188        FINAL_STATUS.add(SUCCEEDED);
1189        FINAL_STATUS.add(KILLED);
1190        FINAL_STATUS.add(FAILED);
1191        FINAL_STATUS.add(FAILED_KILLED);
1192    }
1193
1194    @Override
1195    public boolean isCompleted(String externalStatus) {
1196        return FINAL_STATUS.contains(externalStatus);
1197    }
1198
1199
1200    /**
1201     * Return the sharelib names for the action.
1202     * <p/>
1203     * If <code>NULL</code> or empty, it means that the action does not use the action
1204     * sharelib.
1205     * <p/>
1206     * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
1207     * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib
1208     * <code>foo</code> directory will be in the action classpath. Multiple sharelib
1209     * sub-directories can be specified as a comma separated list.
1210     * <p/>
1211     * The resolution is done using the following precedence order:
1212     * <ul>
1213     *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the action configuration</li>
1214     *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the job configuration</li>
1215     *     <li><b>action.sharelib.for.#ACTIONTYPE#</b> in the oozie configuration</li>
1216     *     <li>Action Executor <code>getDefaultShareLibName()</code> method</li>
1217     * </ul>
1218     *
1219     *
1220     * @param context executor context.
1221     * @param actionXml
1222     * @param conf action configuration.
1223     * @return the action sharelib names.
1224     */
1225    protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) {
1226        String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
1227        if (names == null || names.length == 0) {
1228            try {
1229                XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
1230                names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
1231                if (names == null || names.length == 0) {
1232                    names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
1233                    if (names == null || names.length == 0) {
1234                        String name = getDefaultShareLibName(actionXml);
1235                        if (name != null) {
1236                            names = new String[] { name };
1237                        }
1238                    }
1239                }
1240            }
1241            catch (IOException ex) {
1242                throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
1243            }
1244        }
1245        return names;
1246    }
1247
1248    private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";
1249
1250
1251    /**
1252     * Returns the default sharelib name for the action if any.
1253     *
1254     * @param actionXml the action XML fragment.
1255     * @return the sharelib name for the action, <code>NULL</code> if none.
1256     */
1257    protected String getDefaultShareLibName(Element actionXml) {
1258        return null;
1259    }
1260
1261    /**
1262     * Sets some data for the action on completion
1263     *
1264     * @param context executor context
1265     * @param actionFs the FileSystem object
1266     */
1267    protected void setActionCompletionData(Context context, FileSystem actionFs) throws IOException,
1268            HadoopAccessorException, URISyntaxException {
1269    }
1270}