001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.util.List;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.mapred.Counters;
028import org.apache.hadoop.mapred.JobClient;
029import org.apache.hadoop.mapred.JobConf;
030import org.apache.hadoop.mapred.JobID;
031import org.apache.hadoop.mapred.RunningJob;
032import org.apache.oozie.action.ActionExecutorException;
033import org.apache.oozie.client.WorkflowAction;
034import org.apache.oozie.service.Services;
035import org.apache.oozie.util.XConfiguration;
036import org.apache.oozie.util.XLog;
037import org.apache.oozie.util.XmlUtils;
038import org.jdom.Element;
039import org.jdom.Namespace;
040import org.json.simple.JSONObject;
041
042public class MapReduceActionExecutor extends JavaActionExecutor {
043
044    public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
045    public static final String HADOOP_COUNTERS = "hadoop.counters";
046    public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
047    private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
048    private XLog log = XLog.getLog(getClass());
049
050    public MapReduceActionExecutor() {
051        super("map-reduce");
052    }
053
054    @Override
055    protected List<Class> getLauncherClasses() {
056        List<Class> classes = super.getLauncherClasses();
057        classes.add(LauncherMain.class);
058        classes.add(MapReduceMain.class);
059        classes.add(PipesMain.class);
060        try {
061            classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME));
062        }
063        catch (ClassNotFoundException e) {
064            throw new RuntimeException("Class not found", e);
065        }
066        return classes;
067    }
068
069    @Override
070    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
071        String mainClass;
072        Namespace ns = actionXml.getNamespace();
073        if (actionXml.getChild("streaming", ns) != null) {
074            mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, STREAMING_MAIN_CLASS_NAME);
075        }
076        else {
077            if (actionXml.getChild("pipes", ns) != null) {
078                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PipesMain.class.getName());
079            }
080            else {
081                mainClass = launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MapReduceMain.class.getName());
082            }
083        }
084        return mainClass;
085    }
086
087    @Override
088    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
089        super.setupLauncherConf(conf, actionXml, appPath, context);
090        conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
091        return conf;
092    }
093
094    @Override
095    @SuppressWarnings("unchecked")
096    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
097            throws ActionExecutorException {
098        boolean regularMR = false;
099        Namespace ns = actionXml.getNamespace();
100        if (actionXml.getChild("streaming", ns) != null) {
101            Element streamingXml = actionXml.getChild("streaming", ns);
102            String mapper = streamingXml.getChildTextTrim("mapper", ns);
103            String reducer = streamingXml.getChildTextTrim("reducer", ns);
104            String recordReader = streamingXml.getChildTextTrim("record-reader", ns);
105            List<Element> list = (List<Element>) streamingXml.getChildren("record-reader-mapping", ns);
106            String[] recordReaderMapping = new String[list.size()];
107            for (int i = 0; i < list.size(); i++) {
108                recordReaderMapping[i] = list.get(i).getTextTrim();
109            }
110            list = (List<Element>) streamingXml.getChildren("env", ns);
111            String[] env = new String[list.size()];
112            for (int i = 0; i < list.size(); i++) {
113                env[i] = list.get(i).getTextTrim();
114            }
115            setStreaming(actionConf, mapper, reducer, recordReader, recordReaderMapping, env);
116        }
117        else {
118            if (actionXml.getChild("pipes", ns) != null) {
119                Element pipesXml = actionXml.getChild("pipes", ns);
120                String map = pipesXml.getChildTextTrim("map", ns);
121                String reduce = pipesXml.getChildTextTrim("reduce", ns);
122                String inputFormat = pipesXml.getChildTextTrim("inputformat", ns);
123                String partitioner = pipesXml.getChildTextTrim("partitioner", ns);
124                String writer = pipesXml.getChildTextTrim("writer", ns);
125                String program = pipesXml.getChildTextTrim("program", ns);
126                PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
127            }
128            else {
129                regularMR = true;
130            }
131        }
132        actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
133
134        // For "regular" (not streaming or pipes) MR jobs
135        if (regularMR) {
136            // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
137            String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
138            if (uberJar != null) {
139                if (!Services.get().getConf().getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE, false)) {
140                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
141                            "{0} property is not allowed.  Set {1} to true in oozie-site to enable.",
142                            MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
143                }
144                String nameNode = actionXml.getChildTextTrim("name-node", ns);
145                if (nameNode != null) {
146                    Path uberJarPath = new Path(uberJar);
147                    if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
148                        if (uberJarPath.isAbsolute()) {     // absolute path without namenode --> prepend namenode
149                            Path nameNodePath = new Path(nameNode);
150                            String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
151                                    + "://" + nameNodePath.toUri().getAuthority();
152                            actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR,
153                                    new Path(nameNodeSchemeAuthority + uberJarPath).toString());
154                        }
155                        else {                              // relative path --> prepend app path
156                            actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
157                        }
158                    }
159                }
160            }
161        }
162        else {
163            if (actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR) != null) {
164                log.warn("The " + MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not"
165                        + "streaming nor pipes) workflows, ignoring");
166                actionConf.set(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR, "");
167            }
168        }
169
170        return actionConf;
171    }
172
173    @Override
174    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
175        super.end(context, action);
176        JobClient jobClient = null;
177        boolean exception = false;
178        try {
179            if (action.getStatus() == WorkflowAction.Status.OK) {
180                Element actionXml = XmlUtils.parseXml(action.getConf());
181                JobConf jobConf = createBaseHadoopConf(context, actionXml);
182                jobClient = createJobClient(context, jobConf);
183                RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs()));
184                if (runningJob == null) {
185                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR002",
186                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!",
187                            action.getExternalChildIDs(), action.getId());
188                }
189
190                Counters counters = runningJob.getCounters();
191                if (counters != null) {
192                    ActionStats stats = new MRStats(counters);
193                    String statsJsonString = stats.toJSON();
194                    context.setVar(HADOOP_COUNTERS, statsJsonString);
195
196                    // If action stats write property is set to false by user or
197                    // size of stats is greater than the maximum allowed size,
198                    // do not store the action stats
199                    if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
200                            OOZIE_ACTION_EXTERNAL_STATS_WRITE, "false"))
201                            && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
202                        context.setExecutionStats(statsJsonString);
203                        log.debug(
204                                "Printing stats for Map-Reduce action as a JSON string : [{0}]", statsJsonString);
205                    }
206                }
207                else {
208                    context.setVar(HADOOP_COUNTERS, "");
209                    XLog.getLog(getClass()).warn("Could not find Hadoop Counters for: [{0}]",
210                            action.getExternalChildIDs());
211                }
212            }
213        }
214        catch (Exception ex) {
215            exception = true;
216            throw convertException(ex);
217        }
218        finally {
219            if (jobClient != null) {
220                try {
221                    jobClient.close();
222                }
223                catch (Exception e) {
224                    if (exception) {
225                        log.error("JobClient error: ", e);
226                    }
227                    else {
228                        throw convertException(e);
229                    }
230                }
231            }
232        }
233    }
234
235    // Return the value of the specified configuration property
236    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
237        try {
238            if (actionConf != null) {
239                Namespace ns = actionConf.getNamespace();
240                Element e = actionConf.getChild("configuration", ns);
241                String strConf = XmlUtils.prettyPrint(e).toString();
242                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
243                return inlineConf.get(key, defaultValue);
244            }
245            return "";
246        }
247        catch (IOException ex) {
248            throw convertException(ex);
249        }
250    }
251
252    @SuppressWarnings("unchecked")
253    private JSONObject counterstoJson(Counters counters) {
254
255        if (counters == null) {
256            return null;
257        }
258
259        JSONObject groups = new JSONObject();
260        for (String gName : counters.getGroupNames()) {
261            JSONObject group = new JSONObject();
262            for (Counters.Counter counter : counters.getGroup(gName)) {
263                String cName = counter.getName();
264                Long cValue = counter.getCounter();
265                group.put(cName, cValue);
266            }
267            groups.put(gName, group);
268        }
269        return groups;
270    }
271
272    /**
273     * Return the sharelib name for the action.
274     *
275     * @return returns <code>streaming</code> if mapreduce-streaming action, <code>NULL</code> otherwise.
276     * @param actionXml
277     */
278    @Override
279    protected String getDefaultShareLibName(Element actionXml) {
280        Namespace ns = actionXml.getNamespace();
281        return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
282    }
283
284    @Override
285    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
286            Configuration actionConf) throws ActionExecutorException {
287        // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
288        // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
289        // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
290        // argument and we can just look up the uber jar in the actionConf argument.
291        JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
292        Namespace ns = actionXml.getNamespace();
293        if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
294            // Set for uber jar
295            String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
296            if (uberJar != null && uberJar.trim().length() > 0) {
297                launcherJobConf.setJar(uberJar);
298            }
299        }
300        return launcherJobConf;
301    }
302
303    public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
304                                    String[] recordReaderMapping, String[] env) {
305        if (mapper != null) {
306            conf.set("oozie.streaming.mapper", mapper);
307        }
308        if (reducer != null) {
309            conf.set("oozie.streaming.reducer", reducer);
310        }
311        if (recordReader != null) {
312            conf.set("oozie.streaming.record-reader", recordReader);
313        }
314        MapReduceMain.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
315        MapReduceMain.setStrings(conf, "oozie.streaming.env", env);
316    }
317
318    @Override
319    protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
320
321        RunningJob runningJob;
322        String launcherJobId = action.getExternalId();
323        String childJobId = action.getExternalChildIDs();
324
325        if (childJobId != null && childJobId.length() > 0) {
326            runningJob = jobClient.getJob(JobID.forName(childJobId));
327        }
328        else {
329            runningJob = jobClient.getJob(JobID.forName(launcherJobId));
330        }
331
332        return runningJob;
333    }
334}