001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.fs.FileSystem;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.mapred.RunningJob;
024import org.apache.oozie.action.ActionExecutorException;
025import org.apache.oozie.client.XOozieClient;
026import org.apache.oozie.client.WorkflowAction;
027import org.apache.oozie.service.HadoopAccessorException;
028import org.apache.oozie.util.IOUtils;
029import org.apache.oozie.util.XLog;
030import org.jdom.Element;
031import org.jdom.Namespace;
032import org.jdom.JDOMException;
033
034import java.io.BufferedReader;
035import java.io.IOException;
036import java.io.InputStream;
037import java.io.InputStreamReader;
038import java.net.URISyntaxException;
039import java.util.List;
040
041public class PigActionExecutor extends ScriptLanguageActionExecutor {
042
043    private static final String PIG_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.PigMain";
044    private static final String OOZIE_PIG_STATS = "org.apache.oozie.action.hadoop.OoziePigStats";
045    static final String PIG_SCRIPT = "oozie.pig.script";
046    static final String PIG_PARAMS = "oozie.pig.params";
047    static final String PIG_ARGS = "oozie.pig.args";
048
049    public PigActionExecutor() {
050        super("pig");
051    }
052
053    @Override
054    protected List<Class> getLauncherClasses() {
055        List<Class> classes = super.getLauncherClasses();
056        try {
057            classes.add(Class.forName(PIG_MAIN_CLASS_NAME));
058            classes.add(Class.forName(OOZIE_PIG_STATS));
059        }
060        catch (ClassNotFoundException e) {
061            throw new RuntimeException("Class not found", e);
062        }
063        return classes;
064    }
065
066
067    @Override
068    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
069        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PIG_MAIN_CLASS_NAME);
070    }
071
072    @Override
073    void injectActionCallback(Context context, Configuration launcherConf) {
074    }
075
076    @Override
077    @SuppressWarnings("unchecked")
078    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
079            throws ActionExecutorException {
080        super.setupActionConf(actionConf, context, actionXml, appPath);
081        Namespace ns = actionXml.getNamespace();
082
083        String script = actionXml.getChild("script", ns).getTextTrim();
084        String pigName = new Path(script).getName();
085
086        List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
087        String[] strParams = new String[params.size()];
088        for (int i = 0; i < params.size(); i++) {
089            strParams[i] = params.get(i).getTextTrim();
090        }
091        String[] strArgs = null;
092        List<Element> eArgs = actionXml.getChildren("argument", ns);
093        if (eArgs != null && eArgs.size() > 0) {
094            strArgs = new String[eArgs.size()];
095            for (int i = 0; i < eArgs.size(); i++) {
096                strArgs[i] = eArgs.get(i).getTextTrim();
097            }
098        }
099        setPigScript(actionConf, pigName, strParams, strArgs);
100        return actionConf;
101    }
102
103    public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
104        conf.set(PIG_SCRIPT, script);
105        MapReduceMain.setStrings(conf, PIG_PARAMS, params);
106        MapReduceMain.setStrings(conf, PIG_ARGS, args);
107    }
108
109
110    @Override
111    protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
112        return false;
113    }
114
115    /**
116     * Get the stats and external child IDs for a pig job
117     *
118     * @param actionFs the FileSystem object
119     * @param runningJob the runningJob
120     * @param action the Workflow action
121     * @param context executor context
122     *
123     */
124    @Override
125    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
126        super.getActionData(actionFs, runningJob, action, context);
127        String stats = getStats(context, actionFs);
128        context.setExecutionStats(stats);
129        String externalChildIDs = getExternalChildIDs(context, actionFs);
130        context.setExternalChildIDs(externalChildIDs);
131    }
132
133    private String getStats(Context context, FileSystem actionFs) throws IOException, HadoopAccessorException,
134            URISyntaxException {
135        Path actionOutput = LauncherMapperHelper.getActionStatsDataPath(context.getActionDir());
136        String stats = null;
137        if (actionFs.exists(actionOutput)) {
138            stats = getDataFromPath(actionOutput, actionFs);
139
140        }
141        return stats;
142    }
143
144    @Override
145    protected void setActionCompletionData(Context context, FileSystem fs) throws HadoopAccessorException, IOException,
146            URISyntaxException {
147        String data = getExternalChildIDs(context, fs);
148        context.setExternalChildIDs(data);
149    }
150
151    private String getExternalChildIDs(Context context, FileSystem actionFs) throws IOException,
152            HadoopAccessorException, URISyntaxException {
153        Path actionOutput = LauncherMapperHelper.getExternalChildIDsDataPath(context.getActionDir());
154        String externalIDs = null;
155        if (actionFs.exists(actionOutput)) {
156            externalIDs = getDataFromPath(actionOutput, actionFs);
157            XLog.getLog(getClass()).info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
158        }
159        return externalIDs;
160    }
161
162    private static String getDataFromPath(Path actionOutput, FileSystem actionFs) throws IOException{
163        BufferedReader reader = null;
164        String data = null;
165        try {
166            InputStream is = actionFs.open(actionOutput);
167            reader = new BufferedReader(new InputStreamReader(is));
168            data = IOUtils.getReaderAsString(reader, -1);
169
170        }
171        finally {
172            if (reader != null) {
173                reader.close();
174            }
175        }
176        return data;
177    }
178
179    /**
180     * Return the sharelib postfix for the action.
181     *
182     * @return returns <code>pig</code>.
183     * @param actionXml
184     */
185    @Override
186    protected String getDefaultShareLibName(Element actionXml) {
187        return "pig";
188    }
189
190    protected String getScriptName() {
191        return XOozieClient.PIG_SCRIPT;
192    }
193
194}