001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.net.URISyntaxException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Properties;
026import java.util.StringTokenizer;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.mapred.Counters;
032import org.apache.hadoop.mapred.JobClient;
033import org.apache.hadoop.mapred.JobConf;
034import org.apache.hadoop.mapred.JobID;
035import org.apache.hadoop.mapred.RunningJob;
036import org.apache.oozie.action.ActionExecutorException;
037import org.apache.oozie.client.WorkflowAction;
038import org.apache.oozie.service.HadoopAccessorException;
039import org.apache.oozie.util.XConfiguration;
040import org.apache.oozie.util.XmlUtils;
041import org.apache.oozie.util.XLog;
042import org.jdom.Element;
043import org.jdom.JDOMException;
044import org.jdom.Namespace;
045
046public class SqoopActionExecutor extends JavaActionExecutor {
047
048  public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
049  private static final String SQOOP_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SqoopMain";
050  static final String SQOOP_ARGS = "oozie.sqoop.args";
051
052    public SqoopActionExecutor() {
053        super("sqoop");
054    }
055
056    @Override
057    protected List<Class> getLauncherClasses() {
058        List<Class> classes = super.getLauncherClasses();
059        classes.add(LauncherMain.class);
060        classes.add(MapReduceMain.class);
061        try {
062            classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME));
063        }
064        catch (ClassNotFoundException e) {
065            throw new RuntimeException("Class not found", e);
066        }
067        return classes;
068    }
069
070    @Override
071    protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
072        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SQOOP_MAIN_CLASS_NAME);
073    }
074
075    @Override
076    @SuppressWarnings("unchecked")
077    Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
078            throws ActionExecutorException {
079        super.setupActionConf(actionConf, context, actionXml, appPath);
080        Namespace ns = actionXml.getNamespace();
081
082        try {
083            Element e = actionXml.getChild("configuration", ns);
084            if (e != null) {
085                String strConf = XmlUtils.prettyPrint(e).toString();
086                XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
087                checkForDisallowedProps(inlineConf, "inline configuration");
088                XConfiguration.copy(inlineConf, actionConf);
089            }
090        } catch (IOException ex) {
091            throw convertException(ex);
092        }
093
094        String[] args;
095        if (actionXml.getChild("command", ns) != null) {
096            String command = actionXml.getChild("command", ns).getTextTrim();
097            StringTokenizer st = new StringTokenizer(command, " ");
098            List<String> l = new ArrayList<String>();
099            while (st.hasMoreTokens()) {
100                l.add(st.nextToken());
101            }
102            args = l.toArray(new String[l.size()]);
103        }
104        else {
105            List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
106            args = new String[eArgs.size()];
107            for (int i = 0; i < eArgs.size(); i++) {
108                args[i] = eArgs.get(i).getTextTrim();
109            }
110        }
111
112        setSqoopCommand(actionConf, args);
113        return actionConf;
114    }
115
116    private void setSqoopCommand(Configuration conf, String[] args) {
117        MapReduceMain.setStrings(conf, SQOOP_ARGS, args);
118    }
119
120    /**
121     * We will gather counters from all executed action Hadoop jobs (e.g. jobs
122     * that moved data, not the launcher itself) and merge them together. There
123     * will be only one job most of the time. The only exception is
124     * import-all-table option that will execute one job per one exported table.
125     *
126     * @param context Action context
127     * @param action Workflow action
128     * @throws ActionExecutorException
129     */
130    @Override
131    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
132        super.end(context, action);
133        JobClient jobClient = null;
134
135        boolean exception = false;
136        try {
137            if (action.getStatus() == WorkflowAction.Status.OK) {
138                Element actionXml = XmlUtils.parseXml(action.getConf());
139                JobConf jobConf = createBaseHadoopConf(context, actionXml);
140                jobClient = createJobClient(context, jobConf);
141
142                // Cumulative counters for all Sqoop mapreduce jobs
143                Counters counters = null;
144
145                String externalIds = action.getExternalChildIDs();
146                String []jobIds = externalIds.split(",");
147
148                for(String jobId : jobIds) {
149                    RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
150                    if (runningJob == null) {
151                      throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "SQOOP001",
152                        "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", action
153                        .getExternalId(), action.getId());
154                    }
155
156                    Counters taskCounters = runningJob.getCounters();
157                    if(taskCounters != null) {
158                        if(counters == null) {
159                          counters = taskCounters;
160                        } else {
161                          counters.incrAllCounters(taskCounters);
162                        }
163                    } else {
164                      XLog.getLog(getClass()).warn("Could not find Hadoop Counters for job: [{0}]", jobId);
165                    }
166                }
167
168                if (counters != null) {
169                    ActionStats stats = new MRStats(counters);
170                    String statsJsonString = stats.toJSON();
171                    context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, statsJsonString);
172
173                    // If action stats write property is set to false by user or
174                    // size of stats is greater than the maximum allowed size,
175                    // do not store the action stats
176                    if (Boolean.parseBoolean(evaluateConfigurationProperty(actionXml,
177                            OOZIE_ACTION_EXTERNAL_STATS_WRITE, "true"))
178                            && (statsJsonString.getBytes().length <= getMaxExternalStatsSize())) {
179                        context.setExecutionStats(statsJsonString);
180                        log.debug(
181                          "Printing stats for sqoop action as a JSON string : [{0}]", statsJsonString);
182                    }
183                } else {
184                    context.setVar(MapReduceActionExecutor.HADOOP_COUNTERS, "");
185                    XLog.getLog(getClass()).warn("Can't find any associated Hadoop job counters");
186                }
187            }
188        }
189        catch (Exception ex) {
190            exception = true;
191            throw convertException(ex);
192        }
193        finally {
194            if (jobClient != null) {
195                try {
196                    jobClient.close();
197                }
198                catch (Exception e) {
199                    if (exception) {
200                        log.error("JobClient error: ", e);
201                    }
202                    else {
203                        throw convertException(e);
204                    }
205                }
206            }
207        }
208    }
209
210    // Return the value of the specified configuration property
211    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
212            throws ActionExecutorException {
213        try {
214            if (actionConf != null) {
215                Namespace ns = actionConf.getNamespace();
216                Element e = actionConf.getChild("configuration", ns);
217
218                if(e != null) {
219                  String strConf = XmlUtils.prettyPrint(e).toString();
220                  XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
221                  return inlineConf.get(key, defaultValue);
222                }
223            }
224            return defaultValue;
225        }
226        catch (IOException ex) {
227            throw convertException(ex);
228        }
229    }
230
231    /**
232     * Get the stats and external child IDs
233     *
234     * @param actionFs the FileSystem object
235     * @param runningJob the runningJob
236     * @param action the Workflow action
237     * @param context executor context
238     *
239     */
240    @Override
241    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
242            throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
243        super.getActionData(actionFs, runningJob, action, context);
244
245        // Load stored Hadoop jobs ids and promote them as external child ids
246        action.getData();
247        Properties props = new Properties();
248        props.load(new StringReader(action.getData()));
249        context.setExternalChildIDs((String)props.get(LauncherMain.HADOOP_JOBS));
250    }
251
252    @Override
253    protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException {
254        return true;
255    }
256
257
258    /**
259     * Return the sharelib name for the action.
260     *
261     * @return returns <code>sqoop</code>.
262     * @param actionXml
263     */
264    @Override
265    protected String getDefaultShareLibName(Element actionXml) {
266        return "sqoop";
267    }
268
269}