001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.BufferedReader;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.io.OutputStream;
025import java.io.OutputStreamWriter;
026import java.io.Writer;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.List;
030import java.util.Map;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.mapred.Counters;
036import org.apache.hadoop.mapred.JobConf;
037import org.apache.hadoop.mapred.RunningJob;
038import org.apache.oozie.service.HadoopAccessorException;
039import org.apache.oozie.service.HadoopAccessorService;
040import org.apache.oozie.service.Services;
041import org.apache.oozie.service.URIHandlerService;
042import org.apache.oozie.util.XLog;
043
044public class LauncherMapperHelper {
045
046    public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
047            throws HadoopAccessorException, IOException {
048        String jobId = null;
049        Path recoveryFile = new Path(actionDir, recoveryId);
050        FileSystem fs = Services.get().get(HadoopAccessorService.class)
051                .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
052
053        if (fs.exists(recoveryFile)) {
054            InputStream is = fs.open(recoveryFile);
055            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
056            jobId = reader.readLine();
057            reader.close();
058        }
059        return jobId;
060
061    }
062
063    public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
064        // Only set the javaMainClass if its not null or empty string (should be the case except for java action), this way the user
065        // can override the action's main class via <configuration> property
066        if (javaMainClass != null && !javaMainClass.equals("")) {
067            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
068        }
069    }
070
071    public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
072        for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
073            launcherConf.set(entry.getKey(), entry.getValue());
074        }
075    }
076
077    public static void setupMainArguments(Configuration launcherConf, String[] args) {
078        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
079        for (int i = 0; i < args.length; i++) {
080            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
081        }
082    }
083
084    public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
085        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
086    }
087
088    /**
089     * Set the maximum value of stats data
090     *
091     * @param launcherConf the oozie launcher configuration
092     * @param maxStatsData the maximum allowed size of stats data
093     */
094    public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
095        launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
096    }
097
098    public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
099            String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
100
101        launcherConf.setMapperClass(LauncherMapper.class);
102        launcherConf.setSpeculativeExecution(false);
103        launcherConf.setNumMapTasks(1);
104        launcherConf.setNumReduceTasks(0);
105
106        launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
107        launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
108        launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
109        launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
110        launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
111
112        actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
113        actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
114
115        if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
116          List<String> purgedEntries = new ArrayList<String>();
117          Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
118          for (String entry : entries) {
119            if (entry.contains("#")) {
120              purgedEntries.add(entry);
121            }
122          }
123          actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
124          launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
125        }
126
127        FileSystem fs =
128          Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
129                                                                           actionDir.toUri(), launcherConf);
130        fs.mkdirs(actionDir);
131
132        OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
133        actionConf.writeXml(os);
134        os.close();
135
136        Path inputDir = new Path(actionDir, "input");
137        fs.mkdirs(inputDir);
138        Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt")));
139        writer.write("dummy");
140        writer.close();
141
142        launcherConf.set("mapred.input.dir", inputDir.toString());
143        launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
144    }
145
146    public static boolean isMainDone(RunningJob runningJob) throws IOException {
147        return runningJob.isComplete();
148    }
149
150    public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
151        boolean succeeded = runningJob.isSuccessful();
152        if (succeeded) {
153            Counters counters = runningJob.getCounters();
154            if (counters != null) {
155                Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
156                if (group != null) {
157                    succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
158                }
159            }
160        }
161        return succeeded;
162    }
163
164    public static boolean hasOutputData(RunningJob runningJob) throws IOException {
165        boolean output = false;
166        Counters counters = runningJob.getCounters();
167        if (counters != null) {
168            Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
169            if (group != null) {
170                output = group.getCounter(LauncherMapper.COUNTER_OUTPUT_DATA) == 1;
171            }
172        }
173        return output;
174    }
175
176    /**
177     * Check whether runningJob has stats data or not
178     *
179     * @param runningJob the runningJob
180     * @return returns whether the running Job has stats data or not
181     * @throws IOException
182     */
183    public static boolean hasStatsData(RunningJob runningJob) throws IOException{
184        boolean output = false;
185        Counters counters = runningJob.getCounters();
186        if (counters != null) {
187            Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
188            if (group != null) {
189                output = group.getCounter(LauncherMapper.COUNTER_STATS_DATA) == 1;
190            }
191        }
192        return output;
193    }
194
195    public static boolean hasIdSwap(RunningJob runningJob) throws IOException {
196        boolean swap = false;
197        Counters counters = runningJob.getCounters();
198        if (counters != null) {
199            Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
200            if (group != null) {
201                swap = group.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1;
202            }
203        }
204        return swap;
205    }
206
207    public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir)
208            throws IOException, HadoopAccessorException {
209        boolean swap = false;
210
211        XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper");
212
213        Counters counters = runningJob.getCounters();
214        if (counters != null) {
215            Counters.Group counterGroup = counters.getGroup(LauncherMapper.COUNTER_GROUP);
216            if (counterGroup != null) {
217                swap = counterGroup.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1;
218            }
219        }
220        // additional check for swapped hadoop ID
221        // Can't rely on hadoop counters existing
222        // we'll check for the newID file in hdfs if the hadoop counters is null
223        else {
224
225            Path p = getIdSwapPath(actionDir);
226            // log.debug("Checking for newId file in: [{0}]", p);
227
228            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
229            Configuration conf = has.createJobConf(p.toUri().getAuthority());
230            FileSystem fs = has.createFileSystem(user, p.toUri(), conf);
231            if (fs.exists(p)) {
232                log.debug("Hadoop Counters is null, but found newID file.");
233
234                swap = true;
235            }
236            else {
237                log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p);
238            }
239        }
240        return swap;
241    }
242
243    public static Path getOutputDataPath(Path actionDir) {
244        return new Path(actionDir, LauncherMapper.ACTION_OUTPUT_PROPS);
245    }
246
247    /**
248     * Get the location of stats file
249     *
250     * @param actionDir the action directory
251     * @return the hdfs location of the file
252     */
253    public static Path getActionStatsDataPath(Path actionDir){
254        return new Path(actionDir, LauncherMapper.ACTION_STATS_PROPS);
255    }
256
257    /**
258     * Get the location of external Child IDs file
259     *
260     * @param actionDir the action directory
261     * @return the hdfs location of the file
262     */
263    public static Path getExternalChildIDsDataPath(Path actionDir){
264        return new Path(actionDir, LauncherMapper.ACTION_EXTERNAL_CHILD_IDS_PROPS);
265    }
266
267    public static Path getErrorPath(Path actionDir) {
268        return new Path(actionDir, LauncherMapper.ACTION_ERROR_PROPS);
269    }
270
271    public static Path getIdSwapPath(Path actionDir) {
272        return new Path(actionDir, LauncherMapper.ACTION_NEW_ID_PROPS);
273    }
274}