001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.client;
019
020import java.io.BufferedReader;
021import java.io.File;
022import java.io.FileReader;
023import java.io.IOException;
024import java.io.InputStreamReader;
025import java.net.HttpURLConnection;
026import java.util.Properties;
027
028import org.apache.oozie.cli.OozieCLI;
029import org.apache.oozie.client.rest.JsonTags;
030import org.apache.oozie.client.rest.RestConstants;
031import org.json.simple.JSONObject;
032import org.json.simple.JSONValue;
033
034public class XOozieClient extends OozieClient {
035
036    public static final String JT = "mapred.job.tracker";
037    public static final String JT_2 = "mapreduce.jobtracker.address";
038
039    public static final String NN = "fs.default.name";
040    public static final String NN_2 = "fs.defaultFS";
041
042    @Deprecated
043    public static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
044
045    @Deprecated
046    public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
047
048    public static final String PIG_SCRIPT = "oozie.pig.script";
049
050    public static final String PIG_OPTIONS = "oozie.pig.options";
051
052    public static final String PIG_SCRIPT_PARAMS = "oozie.pig.script.params";
053
054    public static final String HIVE_SCRIPT = "oozie.hive.script";
055
056    public static final String HIVE_OPTIONS = "oozie.hive.options";
057
058    public static final String HIVE_SCRIPT_PARAMS = "oozie.hive.script.params";
059
060    public static final String FILES = "oozie.files";
061
062    public static final String ARCHIVES = "oozie.archives";
063
064    public static final String IS_PROXY_SUBMISSION = "oozie.proxysubmission";
065
066    protected XOozieClient() {
067    }
068
069    /**
070     * Create an eXtended Workflow client instance.
071     *
072     * @param oozieUrl URL of the Oozie instance it will interact with.
073     */
074    public XOozieClient(String oozieUrl) {
075        super(oozieUrl);
076    }
077
078    private String readScript(String script) throws IOException {
079        if (!new File(script).exists()) {
080            throw new IOException("Error: script file [" + script + "] does not exist");
081        }
082
083        BufferedReader br = null;
084        try {
085            br = new BufferedReader(new FileReader(script));
086            StringBuilder sb = new StringBuilder();
087            String line;
088            while ((line = br.readLine()) != null) {
089                sb.append(line + "\n");
090            }
091            return sb.toString();
092        }
093        finally {
094            try {
095                br.close();
096            }
097            catch (IOException ex) {
098                System.err.println("Error: " + ex.getMessage());
099            }
100        }
101    }
102
103    static void setStrings(Properties conf, String key, String[] values) {
104        if (values != null) {
105            conf.setProperty(key + ".size", (new Integer(values.length)).toString());
106            for (int i = 0; i < values.length; i++) {
107                conf.setProperty(key + "." + i, values[i]);
108            }
109        }
110    }
111
112    private void validateHttpSubmitConf(Properties conf) {
113        String JT = conf.getProperty(XOozieClient.JT);
114        String JT_2 = conf.getProperty(XOozieClient.JT_2);
115        if (JT == null) {
116            if(JT_2 == null) {
117                throw new RuntimeException("jobtracker is not specified in conf");
118            }
119        }
120
121        String NN = conf.getProperty(XOozieClient.NN);
122        String NN_2 = conf.getProperty(XOozieClient.NN_2);
123        if (NN == null) {
124            if(NN_2 == null) {
125                throw new RuntimeException("namenode is not specified in conf");
126            } else {
127                NN = NN_2;
128            }
129        }
130
131        String libPath = conf.getProperty(LIBPATH);
132        if (libPath == null) {
133            throw new RuntimeException("libpath is not specified in conf");
134        }
135        if (!libPath.contains(":/")) {
136            String newLibPath;
137            if (libPath.startsWith("/")) {
138                if(NN.endsWith("/")) {
139                    newLibPath = NN + libPath.substring(1);
140                } else {
141                    newLibPath = NN + libPath;
142                }
143            } else {
144                throw new RuntimeException("libpath should be absolute");
145            }
146            conf.setProperty(LIBPATH, newLibPath);
147        }
148
149        conf.setProperty(IS_PROXY_SUBMISSION, "true");
150    }
151
152    /**
153     * Submit a Pig job via HTTP.
154     *
155     * @param conf job configuration.
156     * @param pigScriptFile pig script file.
157     * @param pigArgs pig arguments string.
158     * @return the job Id.
159     * @throws OozieClientException thrown if the job could not be submitted.
160     */
161    @Deprecated
162    public String submitPig(Properties conf, String pigScriptFile, String[] pigArgs) throws IOException, OozieClientException {
163        return submitScriptLanguage(conf, pigScriptFile, pigArgs, OozieCLI.PIG_CMD);
164    }
165
166    /**
167     * Submit a Pig or Hive job via HTTP.
168     *
169     * @param conf job configuration.
170     * @param scriptFile  script file.
171     * @param args  arguments string.
172     * @return the job Id.
173     * @throws OozieClientException thrown if the job could not be submitted.
174     */
175    public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String jobType)
176            throws IOException, OozieClientException {
177        return submitScriptLanguage(conf, scriptFile, args, null, jobType);
178    }
179
180    /**
181     * Submit a Pig or Hive job via HTTP.
182     *
183     * @param conf job configuration.
184     * @param scriptFile  script file.
185     * @param args  arguments string.
186     * @param params parameters string.
187     * @return the job Id.
188     * @throws OozieClientException thrown if the job could not be submitted.
189     */
190    public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType)
191            throws IOException, OozieClientException {
192        if (conf == null) {
193            throw new IllegalArgumentException("conf cannot be null");
194        }
195        if (scriptFile == null) {
196            throw new IllegalArgumentException("scriptFile cannot be null");
197        }
198
199        validateHttpSubmitConf(conf);
200
201        String script = "";
202        String options = "";
203        String scriptParams = "";
204
205        if (jobType.equals(OozieCLI.HIVE_CMD)) {
206            script = XOozieClient.HIVE_SCRIPT;
207            options = XOozieClient.HIVE_OPTIONS;
208            scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS;
209        }
210        else if (jobType.equals(OozieCLI.PIG_CMD)) {
211            script =  XOozieClient.PIG_SCRIPT;
212            options = XOozieClient.PIG_OPTIONS;
213            scriptParams = XOozieClient.PIG_SCRIPT_PARAMS;
214        }
215        else {
216            throw new IllegalArgumentException("jobType must be either pig or hive");
217        }
218
219        conf.setProperty(script, readScript(scriptFile));
220        setStrings(conf, options, args);
221        setStrings(conf, scriptParams, params);
222
223        return (new HttpJobSubmit(conf, jobType)).call();
224    }
225
226    /**
227     * Submit a Map/Reduce job via HTTP.
228     *
229     * @param conf job configuration.
230     * @return the job Id.
231     * @throws OozieClientException thrown if the job could not be submitted.
232     */
233    public String submitMapReduce(Properties conf) throws OozieClientException {
234        if (conf == null) {
235            throw new IllegalArgumentException("conf cannot be null");
236        }
237
238        validateHttpSubmitConf(conf);
239
240        return (new HttpJobSubmit(conf, "mapreduce")).call();
241    }
242
243    private class HttpJobSubmit extends ClientCallable<String> {
244        private Properties conf;
245
246        HttpJobSubmit(Properties conf, String jobType) {
247            super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, jobType));
248            this.conf = notNull(conf, "conf");
249        }
250
251        @Override
252        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
253            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
254            writeToXml(conf, conn.getOutputStream());
255            if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
256                JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
257                return (String) json.get(JsonTags.JOB_ID);
258            }
259            if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
260                handleError(conn);
261            }
262            return null;
263        }
264    }
265
266    /**
267     * set LIBPATH for HTTP submission job.
268     *
269     * @param conf Configuration object.
270     * @param pathStr lib HDFS path.
271     */
272    public void setLib(Properties conf, String pathStr) {
273        conf.setProperty(LIBPATH, pathStr);
274    }
275
276    /**
277     * The equivalent to <file> tag in oozie's workflow xml.
278     *
279     * @param conf Configuration object.
280     * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
281     *             For example, "/user/oozie/parameter_file#myparams". If no "#..." is specified, file name will be used as
282     *             symbolic link name.
283     */
284    public void addFile(Properties conf, String file) {
285        if (file == null || file.length() == 0) {
286            throw new IllegalArgumentException("file cannot be null or empty");
287        }
288        String files = conf.getProperty(FILES);
289        conf.setProperty(FILES, files == null ? file : files + "," + file);
290    }
291
292    /**
293     * The equivalent to <archive> tag in oozie's workflow xml.
294     *
295     * @param conf Configuration object.
296     * @param file file HDFS path. A "#..." symbolic string can be appended to the path to specify symbolic link name.
297     *             For example, "/user/oozie/udf1.jar#my.jar". If no "#..." is specified, file name will be used as
298     *             symbolic link name.
299     */
300    public void addArchive(Properties conf, String file) {
301        if (file == null || file.length() == 0) {
302            throw new IllegalArgumentException("file cannot be null or empty");
303        }
304        String files = conf.getProperty(ARCHIVES);
305        conf.setProperty(ARCHIVES, files == null ? file : files + "," + file);
306    }
307}