001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.client;
019
020import java.io.BufferedReader;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.io.OutputStream;
025import java.io.PrintStream;
026import java.io.Reader;
027import java.net.HttpURLConnection;
028import java.net.URL;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.Iterator;
034import java.util.LinkedHashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Properties;
038import java.util.concurrent.Callable;
039
040import javax.xml.parsers.DocumentBuilderFactory;
041import javax.xml.transform.Transformer;
042import javax.xml.transform.TransformerFactory;
043import javax.xml.transform.dom.DOMSource;
044import javax.xml.transform.stream.StreamResult;
045
046import org.apache.oozie.BuildInfo;
047import org.apache.oozie.client.rest.JsonTags;
048import org.apache.oozie.client.rest.JsonToBean;
049import org.apache.oozie.client.rest.RestConstants;
050import org.json.simple.JSONArray;
051import org.json.simple.JSONObject;
052import org.json.simple.JSONValue;
053import org.w3c.dom.Document;
054import org.w3c.dom.Element;
055
056/**
057 * Client API to submit and manage Oozie workflow jobs against an Oozie intance.
058 * <p/>
059 * This class is thread safe.
060 * <p/>
061 * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods:
062 * <code>[NAME=VALUE][;NAME=VALUE]*</code>.
063 * <p/>
064 * Valid filter names are:
065 * <p/>
066 * <ul/>
067 * <li>name: the workflow application name from the workflow definition.</li>
068 * <li>user: the user that submitted the job.</li>
069 * <li>group: the group for the job.</li>
070 * <li>status: the status of the job.</li>
071 * </ul>
072 * <p/>
073 * The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same
074 * name. Multiple values must be specified as different name value pairs.
075 */
076public class OozieClient {
077
078    public static final long WS_PROTOCOL_VERSION_0 = 0;
079
080    public static final long WS_PROTOCOL_VERSION_1 = 1;
081
082    public static final long WS_PROTOCOL_VERSION = 2; // pointer to current version
083
084    public static final String USER_NAME = "user.name";
085
086    @Deprecated
087    public static final String GROUP_NAME = "group.name";
088
089    public static final String JOB_ACL = "oozie.job.acl";
090
091    public static final String APP_PATH = "oozie.wf.application.path";
092
093    public static final String COORDINATOR_APP_PATH = "oozie.coord.application.path";
094
095    public static final String BUNDLE_APP_PATH = "oozie.bundle.application.path";
096
097    public static final String BUNDLE_ID = "oozie.bundle.id";
098
099    public static final String EXTERNAL_ID = "oozie.wf.external.id";
100
101    public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url";
102
103    public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url";
104
105    public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url";
106
107    public static final String RERUN_SKIP_NODES = "oozie.wf.rerun.skip.nodes";
108
109    public static final String RERUN_FAIL_NODES = "oozie.wf.rerun.failnodes";
110
111    public static final String LOG_TOKEN = "oozie.wf.log.token";
112
113    public static final String ACTION_MAX_RETRIES = "oozie.wf.action.max.retries";
114
115    public static final String ACTION_RETRY_INTERVAL = "oozie.wf.action.retry.interval";
116
117    public static final String FILTER_USER = "user";
118
119    public static final String FILTER_GROUP = "group";
120
121    public static final String FILTER_NAME = "name";
122
123    public static final String FILTER_STATUS = "status";
124
125    public static final String FILTER_FREQUENCY = "frequency";
126
127    public static final String FILTER_ID = "id";
128
129    public static final String FILTER_UNIT = "unit";
130
131    public static final String FILTER_JOBID = "jobid";
132
133    public static final String FILTER_APPNAME = "appname";
134
135    public static final String FILTER_SLA_APPNAME = "app_name";
136
137    public static final String FILTER_SLA_ID = "id";
138
139    public static final String FILTER_SLA_PARENT_ID = "parent_id";
140
141    public static final String FILTER_SLA_NOMINAL_START = "nominal_start";
142
143    public static final String FILTER_SLA_NOMINAL_END = "nominal_end";
144
145    public static final String CHANGE_VALUE_ENDTIME = "endtime";
146
147    public static final String CHANGE_VALUE_PAUSETIME = "pausetime";
148
149    public static final String CHANGE_VALUE_CONCURRENCY = "concurrency";
150
151    public static final String LIBPATH = "oozie.libpath";
152
153    public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath";
154
155    public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes";
156
157    public static enum SYSTEM_MODE {
158        NORMAL, NOWEBSERVICE, SAFEMODE
159    };
160
161    /**
162     * debugMode =0 means no debugging. > 0 means debugging on.
163     */
164    public int debugMode = 0;
165
166    private String baseUrl;
167    private String protocolUrl;
168    private boolean validatedVersion = false;
169    private JSONArray supportedVersions;
170    private final Map<String, String> headers = new HashMap<String, String>();
171
172    private static ThreadLocal<String> USER_NAME_TL = new ThreadLocal<String>();
173
174    /**
175     * Allows to impersonate other users in the Oozie server. The current user
176     * must be configured as a proxyuser in Oozie.
177     * <p/>
178     * IMPORTANT: impersonation happens only with Oozie client requests done within
179     * doAs() calls.
180     *
181     * @param userName user to impersonate.
182     * @param callable callable with {@link OozieClient} calls impersonating the specified user.
183     * @return any response returned by the {@link Callable#call()} method.
184     * @throws Exception thrown by the {@link Callable#call()} method.
185     */
186    public static <T> T doAs(String userName, Callable<T> callable) throws Exception {
187        notEmpty(userName, "userName");
188        notNull(callable, "callable");
189        try {
190            USER_NAME_TL.set(userName);
191            return callable.call();
192        }
193        finally {
194            USER_NAME_TL.remove();
195        }
196    }
197
198    protected OozieClient() {
199    }
200
201    /**
202     * Create a Workflow client instance.
203     *
204     * @param oozieUrl URL of the Oozie instance it will interact with.
205     */
206    public OozieClient(String oozieUrl) {
207        this.baseUrl = notEmpty(oozieUrl, "oozieUrl");
208        if (!this.baseUrl.endsWith("/")) {
209            this.baseUrl += "/";
210        }
211    }
212
213    /**
214     * Return the Oozie URL of the workflow client instance.
215     * <p/>
216     * This URL is the base URL fo the Oozie system, with not protocol versioning.
217     *
218     * @return the Oozie URL of the workflow client instance.
219     */
220    public String getOozieUrl() {
221        return baseUrl;
222    }
223
224    /**
225     * Return the Oozie URL used by the client and server for WS communications.
226     * <p/>
227     * This URL is the original URL plus the versioning element path.
228     *
229     * @return the Oozie URL used by the client and server for communication.
230     * @throws OozieClientException thrown in the client and the server are not protocol compatible.
231     */
232    public String getProtocolUrl() throws OozieClientException {
233        validateWSVersion();
234        return protocolUrl;
235    }
236
237    /**
238     * @return current debug Mode
239     */
240    public int getDebugMode() {
241        return debugMode;
242    }
243
244    /**
245     * Set debug mode.
246     *
247     * @param debugMode : 0 means no debugging. > 0 means debugging
248     */
249    public void setDebugMode(int debugMode) {
250        this.debugMode = debugMode;
251    }
252
253    private String getBaseURLForVersion(long protocolVersion) throws OozieClientException {
254        try {
255            if (supportedVersions == null) {
256                supportedVersions = getSupportedProtocolVersions();
257            }
258            if (supportedVersions == null) {
259                throw new OozieClientException("HTTP error", "no response message");
260            }
261            if (supportedVersions.contains(protocolVersion)) {
262                return baseUrl + "v" + protocolVersion + "/";
263            }
264            else {
265                throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, "Protocol version "
266                        + protocolVersion + " is not supported");
267            }
268        }
269        catch (IOException e) {
270            throw new OozieClientException(OozieClientException.IO_ERROR, e);
271        }
272    }
273
274    /**
275     * Validate that the Oozie client and server instances are protocol compatible.
276     *
277     * @throws OozieClientException thrown in the client and the server are not protocol compatible.
278     */
279    public synchronized void validateWSVersion() throws OozieClientException {
280        if (!validatedVersion) {
281            try {
282                supportedVersions = getSupportedProtocolVersions();
283                if (supportedVersions == null) {
284                    throw new OozieClientException("HTTP error", "no response message");
285                }
286                if (!supportedVersions.contains(WS_PROTOCOL_VERSION)
287                        && !supportedVersions.contains(WS_PROTOCOL_VERSION_1)
288                        && !supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
289                    StringBuilder msg = new StringBuilder();
290                    msg.append("Supported version [").append(WS_PROTOCOL_VERSION)
291                            .append("] or less, Unsupported versions[");
292                    String separator = "";
293                    for (Object version : supportedVersions) {
294                        msg.append(separator).append(version);
295                    }
296                    msg.append("]");
297                    throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, msg.toString());
298                }
299                if (supportedVersions.contains(WS_PROTOCOL_VERSION)) {
300                    protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION + "/";
301                }
302                else if (supportedVersions.contains(WS_PROTOCOL_VERSION_1)) {
303                    protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_1 + "/";
304                }
305                else {
306                    if (supportedVersions.contains(WS_PROTOCOL_VERSION_0)) {
307                        protocolUrl = baseUrl + "v" + WS_PROTOCOL_VERSION_0 + "/";
308                    }
309                }
310            }
311            catch (IOException ex) {
312                throw new OozieClientException(OozieClientException.IO_ERROR, ex);
313            }
314            validatedVersion = true;
315        }
316    }
317
318    private JSONArray getSupportedProtocolVersions() throws IOException, OozieClientException {
319        JSONArray versions = null;
320        URL url = new URL(baseUrl + RestConstants.VERSIONS);
321        HttpURLConnection conn = createConnection(url, "GET");
322        if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
323            versions = (JSONArray) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
324        }
325        else {
326            handleError(conn);
327        }
328        return versions;
329    }
330
331    /**
332     * Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name.
333     *
334     * @return an empty configuration.
335     */
336    public Properties createConfiguration() {
337        Properties conf = new Properties();
338        String userName = USER_NAME_TL.get();
339        if (userName == null) {
340            userName = System.getProperty("user.name");
341        }
342        conf.setProperty(USER_NAME, userName);
343        return conf;
344    }
345
346    /**
347     * Set a HTTP header to be used in the WS requests by the workflow instance.
348     *
349     * @param name header name.
350     * @param value header value.
351     */
352    public void setHeader(String name, String value) {
353        headers.put(notEmpty(name, "name"), notNull(value, "value"));
354    }
355
356    /**
357     * Get the value of a set HTTP header from the workflow instance.
358     *
359     * @param name header name.
360     * @return header value, <code>null</code> if not set.
361     */
362    public String getHeader(String name) {
363        return headers.get(notEmpty(name, "name"));
364    }
365
366    /**
367     * Get the set HTTP header
368     *
369     * @return map of header key and value
370     */
371    public Map<String, String> getHeaders() {
372        return headers;
373    }
374
375    /**
376     * Remove a HTTP header from the workflow client instance.
377     *
378     * @param name header name.
379     */
380    public void removeHeader(String name) {
381        headers.remove(notEmpty(name, "name"));
382    }
383
384    /**
385     * Return an iterator with all the header names set in the workflow instance.
386     *
387     * @return header names.
388     */
389    public Iterator<String> getHeaderNames() {
390        return Collections.unmodifiableMap(headers).keySet().iterator();
391    }
392
393    private URL createURL(Long protocolVersion, String collection, String resource, Map<String, String> parameters)
394            throws IOException, OozieClientException {
395        validateWSVersion();
396        StringBuilder sb = new StringBuilder();
397        if (protocolVersion == null) {
398            sb.append(protocolUrl);
399        }
400        else {
401            sb.append(getBaseURLForVersion(protocolVersion));
402        }
403        sb.append(collection);
404        if (resource != null && resource.length() > 0) {
405            sb.append("/").append(resource);
406        }
407        if (parameters.size() > 0) {
408            String separator = "?";
409            for (Map.Entry<String, String> param : parameters.entrySet()) {
410                if (param.getValue() != null) {
411                    sb.append(separator).append(URLEncoder.encode(param.getKey(), "UTF-8")).append("=").append(
412                            URLEncoder.encode(param.getValue(), "UTF-8"));
413                    separator = "&";
414                }
415            }
416        }
417        return new URL(sb.toString());
418    }
419
420    private boolean validateCommand(String url) {
421        {
422            if (protocolUrl.contains(baseUrl + "v0")) {
423                if (url.contains("dryrun") || url.contains("jobtype=c") || url.contains("systemmode")) {
424                    return false;
425                }
426            }
427        }
428        return true;
429    }
430
431    /**
432     * Create http connection to oozie server.
433     *
434     * @param url
435     * @param method
436     * @return connection
437     * @throws IOException
438     * @throws OozieClientException
439     */
440    protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException {
441        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
442        conn.setRequestMethod(method);
443        if (method.equals("POST") || method.equals("PUT")) {
444            conn.setDoOutput(true);
445        }
446        for (Map.Entry<String, String> header : headers.entrySet()) {
447            conn.setRequestProperty(header.getKey(), header.getValue());
448        }
449        return conn;
450    }
451
452    protected abstract class ClientCallable<T> implements Callable<T> {
453        private final String method;
454        private final String collection;
455        private final String resource;
456        private final Map<String, String> params;
457        private final Long protocolVersion;
458
459        public ClientCallable(String method, String collection, String resource, Map<String, String> params) {
460            this(method, null, collection, resource, params);
461        }
462
463        public ClientCallable(String method, Long protocolVersion, String collection, String resource, Map<String, String> params) {
464            this.method = method;
465            this.protocolVersion = protocolVersion;
466            this.collection = collection;
467            this.resource = resource;
468            this.params = params;
469        }
470
471        public T call() throws OozieClientException {
472            try {
473                URL url = createURL(protocolVersion, collection, resource, params);
474                if (validateCommand(url.toString())) {
475                    if (getDebugMode() > 0) {
476                        System.out.println(method + " " + url);
477                    }
478                    HttpURLConnection conn = createConnection(url, method);
479                    return call(conn);
480                }
481                else {
482                    System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater."
483                            + " Use 'oozie help' for details");
484                    throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception());
485                }
486            }
487            catch (IOException ex) {
488                throw new OozieClientException(OozieClientException.IO_ERROR, ex);
489            }
490
491        }
492
493        protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException;
494    }
495
496    static void handleError(HttpURLConnection conn) throws IOException, OozieClientException {
497        int status = conn.getResponseCode();
498        String error = conn.getHeaderField(RestConstants.OOZIE_ERROR_CODE);
499        String message = conn.getHeaderField(RestConstants.OOZIE_ERROR_MESSAGE);
500
501        if (error == null) {
502            error = "HTTP error code: " + status;
503        }
504
505        if (message == null) {
506            message = conn.getResponseMessage();
507        }
508        throw new OozieClientException(error, message);
509    }
510
511    static Map<String, String> prepareParams(String... params) {
512        Map<String, String> map = new LinkedHashMap<String, String>();
513        for (int i = 0; i < params.length; i = i + 2) {
514            map.put(params[i], params[i + 1]);
515        }
516        String doAsUserName = USER_NAME_TL.get();
517        if (doAsUserName != null) {
518            map.put(RestConstants.DO_AS_PARAM, doAsUserName);
519        }
520        return map;
521    }
522
523    public void writeToXml(Properties props, OutputStream out) throws IOException {
524        try {
525            Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
526            Element conf = doc.createElement("configuration");
527            doc.appendChild(conf);
528            conf.appendChild(doc.createTextNode("\n"));
529            for (String name : props.stringPropertyNames()) { // Properties whose key or value is not of type String are omitted.
530                String value = props.getProperty(name);
531                Element propNode = doc.createElement("property");
532                conf.appendChild(propNode);
533
534                Element nameNode = doc.createElement("name");
535                nameNode.appendChild(doc.createTextNode(name.trim()));
536                propNode.appendChild(nameNode);
537
538                Element valueNode = doc.createElement("value");
539                valueNode.appendChild(doc.createTextNode(value.trim()));
540                propNode.appendChild(valueNode);
541
542                conf.appendChild(doc.createTextNode("\n"));
543            }
544
545            DOMSource source = new DOMSource(doc);
546            StreamResult result = new StreamResult(out);
547            TransformerFactory transFactory = TransformerFactory.newInstance();
548            Transformer transformer = transFactory.newTransformer();
549            transformer.transform(source, result);
550            if (getDebugMode() > 0) {
551                result = new StreamResult(System.out);
552                transformer.transform(source, result);
553                System.out.println();
554            }
555        }
556        catch (Exception e) {
557            throw new IOException(e);
558        }
559    }
560
561    private class JobSubmit extends ClientCallable<String> {
562        private final Properties conf;
563
564        JobSubmit(Properties conf, boolean start) {
565            super("POST", RestConstants.JOBS, "", (start) ? prepareParams(RestConstants.ACTION_PARAM,
566                    RestConstants.JOB_ACTION_START) : prepareParams());
567            this.conf = notNull(conf, "conf");
568        }
569
570        JobSubmit(String jobId, Properties conf) {
571            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
572                    RestConstants.JOB_ACTION_RERUN));
573            this.conf = notNull(conf, "conf");
574        }
575
576        public JobSubmit(Properties conf, String jobActionDryrun) {
577            super("POST", RestConstants.JOBS, "", prepareParams(RestConstants.ACTION_PARAM,
578                    RestConstants.JOB_ACTION_DRYRUN));
579            this.conf = notNull(conf, "conf");
580        }
581
582        @Override
583        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
584            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
585            writeToXml(conf, conn.getOutputStream());
586            if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
587                JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
588                return (String) json.get(JsonTags.JOB_ID);
589            }
590            if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
591                handleError(conn);
592            }
593            return null;
594        }
595    }
596
597    /**
598     * Submit a workflow job.
599     *
600     * @param conf job configuration.
601     * @return the job Id.
602     * @throws OozieClientException thrown if the job could not be submitted.
603     */
604    public String submit(Properties conf) throws OozieClientException {
605        return (new JobSubmit(conf, false)).call();
606    }
607
608    private class JobAction extends ClientCallable<Void> {
609
610        JobAction(String jobId, String action) {
611            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action));
612        }
613
614        JobAction(String jobId, String action, String params) {
615            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM, action,
616                    RestConstants.JOB_CHANGE_VALUE, params));
617        }
618
619        @Override
620        protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
621            if (!(conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
622                handleError(conn);
623            }
624            return null;
625        }
626    }
627
628    /**
629     * dryrun for a given job
630     *
631     * @param conf Job configuration.
632     */
633    public String dryrun(Properties conf) throws OozieClientException {
634        return new JobSubmit(conf, RestConstants.JOB_ACTION_DRYRUN).call();
635    }
636
637    /**
638     * Start a workflow job.
639     *
640     * @param jobId job Id.
641     * @throws OozieClientException thrown if the job could not be started.
642     */
643    public void start(String jobId) throws OozieClientException {
644        new JobAction(jobId, RestConstants.JOB_ACTION_START).call();
645    }
646
647    /**
648     * Submit and start a workflow job.
649     *
650     * @param conf job configuration.
651     * @return the job Id.
652     * @throws OozieClientException thrown if the job could not be submitted.
653     */
654    public String run(Properties conf) throws OozieClientException {
655        return (new JobSubmit(conf, true)).call();
656    }
657
658    /**
659     * Rerun a workflow job.
660     *
661     * @param jobId job Id to rerun.
662     * @param conf configuration information for the rerun.
663     * @throws OozieClientException thrown if the job could not be started.
664     */
665    public void reRun(String jobId, Properties conf) throws OozieClientException {
666        new JobSubmit(jobId, conf).call();
667    }
668
669    /**
670     * Suspend a workflow job.
671     *
672     * @param jobId job Id.
673     * @throws OozieClientException thrown if the job could not be suspended.
674     */
675    public void suspend(String jobId) throws OozieClientException {
676        new JobAction(jobId, RestConstants.JOB_ACTION_SUSPEND).call();
677    }
678
679    /**
680     * Resume a workflow job.
681     *
682     * @param jobId job Id.
683     * @throws OozieClientException thrown if the job could not be resume.
684     */
685    public void resume(String jobId) throws OozieClientException {
686        new JobAction(jobId, RestConstants.JOB_ACTION_RESUME).call();
687    }
688
689    /**
690     * Kill a workflow job.
691     *
692     * @param jobId job Id.
693     * @throws OozieClientException thrown if the job could not be killed.
694     */
695    public void kill(String jobId) throws OozieClientException {
696        new JobAction(jobId, RestConstants.JOB_ACTION_KILL).call();
697    }
698
699    /**
700     * Change a coordinator job.
701     *
702     * @param jobId job Id.
703     * @param changeValue change value.
704     * @throws OozieClientException thrown if the job could not be changed.
705     */
706    public void change(String jobId, String changeValue) throws OozieClientException {
707        new JobAction(jobId, RestConstants.JOB_ACTION_CHANGE, changeValue).call();
708    }
709
710    private class JobInfo extends ClientCallable<WorkflowJob> {
711
712        JobInfo(String jobId, int start, int len) {
713            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
714                    RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM, Integer.toString(start),
715                    RestConstants.LEN_PARAM, Integer.toString(len)));
716        }
717
718        @Override
719        protected WorkflowJob call(HttpURLConnection conn) throws IOException, OozieClientException {
720            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
721                Reader reader = new InputStreamReader(conn.getInputStream());
722                JSONObject json = (JSONObject) JSONValue.parse(reader);
723                return JsonToBean.createWorkflowJob(json);
724            }
725            else {
726                handleError(conn);
727            }
728            return null;
729        }
730    }
731
732
733    private class JMSInfo extends ClientCallable<JMSConnectionInfo> {
734
735        JMSInfo() {
736            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_JMS_INFO, prepareParams());
737        }
738
739        protected JMSConnectionInfo call(HttpURLConnection conn) throws IOException, OozieClientException {
740            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
741                Reader reader = new InputStreamReader(conn.getInputStream());
742                JSONObject json = (JSONObject) JSONValue.parse(reader);
743                return JsonToBean.createJMSConnectionInfo(json);
744            }
745            else {
746                handleError(conn);
747            }
748            return null;
749        }
750    }
751
752    private class WorkflowActionInfo extends ClientCallable<WorkflowAction> {
753        WorkflowActionInfo(String actionId) {
754            super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
755                    RestConstants.JOB_SHOW_INFO));
756        }
757
758        @Override
759        protected WorkflowAction call(HttpURLConnection conn) throws IOException, OozieClientException {
760            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
761                Reader reader = new InputStreamReader(conn.getInputStream());
762                JSONObject json = (JSONObject) JSONValue.parse(reader);
763                return JsonToBean.createWorkflowAction(json);
764            }
765            else {
766                handleError(conn);
767            }
768            return null;
769        }
770    }
771
772    /**
773     * Get the info of a workflow job.
774     *
775     * @param jobId job Id.
776     * @return the job info.
777     * @throws OozieClientException thrown if the job info could not be retrieved.
778     */
779    public WorkflowJob getJobInfo(String jobId) throws OozieClientException {
780        return getJobInfo(jobId, 0, 0);
781    }
782
783    /**
784     * Get the JMS Connection info
785     * @return JMSConnectionInfo object
786     * @throws OozieClientException
787     */
788    public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException {
789        return new JMSInfo().call();
790    }
791
792    /**
793     * Get the info of a workflow job and subset actions.
794     *
795     * @param jobId job Id.
796     * @param start starting index in the list of actions belonging to the job
797     * @param len number of actions to be returned
798     * @return the job info.
799     * @throws OozieClientException thrown if the job info could not be retrieved.
800     */
801    public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException {
802        return new JobInfo(jobId, start, len).call();
803    }
804
805    /**
806     * Get the info of a workflow action.
807     *
808     * @param actionId Id.
809     * @return the workflow action info.
810     * @throws OozieClientException thrown if the job info could not be retrieved.
811     */
812    public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException {
813        return new WorkflowActionInfo(actionId).call();
814    }
815
816    /**
817     * Get the log of a workflow job.
818     *
819     * @param jobId job Id.
820     * @return the job log.
821     * @throws OozieClientException thrown if the job info could not be retrieved.
822     */
823    public String getJobLog(String jobId) throws OozieClientException {
824        return new JobLog(jobId).call();
825    }
826
827    /**
828     * Get the log of a job.
829     *
830     * @param jobId job Id.
831     * @param logRetrievalType Based on which filter criteria the log is retrieved
832     * @param logRetrievalScope Value for the retrieval type
833     * @param ps Printstream of command line interface
834     * @throws OozieClientException thrown if the job info could not be retrieved.
835     */
836    public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps)
837            throws OozieClientException {
838        new JobLog(jobId, logRetrievalType, logRetrievalScope, ps).call();
839    }
840
841    private class JobLog extends JobMetadata {
842        JobLog(String jobId) {
843            super(jobId, RestConstants.JOB_SHOW_LOG);
844        }
845
846        JobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) {
847            super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, ps);
848        }
849    }
850
851    /**
852     * Gets the JMS topic name for a particular job
853     * @param jobId given jobId
854     * @return the JMS topic name
855     * @throws OozieClientException
856     */
857    public String getJMSTopicName(String jobId) throws OozieClientException {
858        return new JMSTopic(jobId).call();
859    }
860
861    private class JMSTopic extends ClientCallable<String> {
862
863        JMSTopic(String jobId) {
864            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
865                    RestConstants.JOB_SHOW_JMS_TOPIC));
866        }
867
868        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
869            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
870                Reader reader = new InputStreamReader(conn.getInputStream());
871                JSONObject json = (JSONObject) JSONValue.parse(reader);
872                return (String) json.get(JsonTags.JMS_TOPIC_NAME);
873            }
874            else {
875                handleError(conn);
876            }
877            return null;
878        }
879    }
880
881    /**
882     * Get the definition of a workflow job.
883     *
884     * @param jobId job Id.
885     * @return the job log.
886     * @throws OozieClientException thrown if the job info could not be retrieved.
887     */
888    public String getJobDefinition(String jobId) throws OozieClientException {
889        return new JobDefinition(jobId).call();
890    }
891
892    private class JobDefinition extends JobMetadata {
893
894        JobDefinition(String jobId) {
895            super(jobId, RestConstants.JOB_SHOW_DEFINITION);
896        }
897    }
898
899    private class JobMetadata extends ClientCallable<String> {
900        PrintStream printStream;
901
902        JobMetadata(String jobId, String metaType) {
903            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
904                    metaType));
905        }
906
907        JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, PrintStream ps) {
908            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
909                    metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM,
910                    logRetrievalScope));
911            printStream = ps;
912        }
913
914        @Override
915        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
916            String returnVal = null;
917            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
918                InputStream is = conn.getInputStream();
919                InputStreamReader isr = new InputStreamReader(is);
920                try {
921                    if (printStream != null) {
922                        sendToOutputStream(isr, -1);
923                    }
924                    else {
925                        returnVal = getReaderAsString(isr, -1);
926                    }
927                }
928                finally {
929                    isr.close();
930                }
931            }
932            else {
933                handleError(conn);
934            }
935            return returnVal;
936        }
937
938        /**
939         * Output the log to command line interface
940         *
941         * @param reader reader to read into a string.
942         * @param maxLen max content length allowed, if -1 there is no limit.
943         * @param ps Printstream of command line interface
944         * @throws IOException
945         */
946        private void sendToOutputStream(Reader reader, int maxLen) throws IOException {
947            if (reader == null) {
948                throw new IllegalArgumentException("reader cannot be null");
949            }
950            StringBuilder sb = new StringBuilder();
951            char[] buffer = new char[2048];
952            int read;
953            int count = 0;
954            int noOfCharstoFlush = 1024;
955            while ((read = reader.read(buffer)) > -1) {
956                count += read;
957                if ((maxLen > -1) && (count > maxLen)) {
958                    break;
959                }
960                sb.append(buffer, 0, read);
961                if (sb.length() > noOfCharstoFlush) {
962                    printStream.print(sb.toString());
963                    sb = new StringBuilder("");
964                }
965            }
966            printStream.print(sb.toString());
967        }
968
969        /**
970         * Return a reader as string.
971         * <p/>
972         *
973         * @param reader reader to read into a string.
974         * @param maxLen max content length allowed, if -1 there is no limit.
975         * @return the reader content.
976         * @throws IOException thrown if the resource could not be read.
977         */
978        private String getReaderAsString(Reader reader, int maxLen) throws IOException {
979            if (reader == null) {
980                throw new IllegalArgumentException("reader cannot be null");
981            }
982            StringBuffer sb = new StringBuffer();
983            char[] buffer = new char[2048];
984            int read;
985            int count = 0;
986            while ((read = reader.read(buffer)) > -1) {
987                count += read;
988
989                // read up to maxLen chars;
990                if ((maxLen > -1) && (count > maxLen)) {
991                    break;
992                }
993                sb.append(buffer, 0, read);
994            }
995            reader.close();
996            return sb.toString();
997        }
998    }
999
1000    private class CoordJobInfo extends ClientCallable<CoordinatorJob> {
1001
1002        CoordJobInfo(String jobId, String filter, int start, int len) {
1003            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1004                    RestConstants.JOB_SHOW_INFO, RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM, Integer.toString(start),
1005                    RestConstants.LEN_PARAM, Integer.toString(len)));
1006        }
1007
1008        @Override
1009        protected CoordinatorJob call(HttpURLConnection conn) throws IOException, OozieClientException {
1010            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1011                Reader reader = new InputStreamReader(conn.getInputStream());
1012                JSONObject json = (JSONObject) JSONValue.parse(reader);
1013                return JsonToBean.createCoordinatorJob(json);
1014            }
1015            else {
1016                handleError(conn);
1017            }
1018            return null;
1019        }
1020    }
1021
1022    private class BundleJobInfo extends ClientCallable<BundleJob> {
1023
1024        BundleJobInfo(String jobId) {
1025            super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1026                    RestConstants.JOB_SHOW_INFO));
1027        }
1028
1029        @Override
1030        protected BundleJob call(HttpURLConnection conn) throws IOException, OozieClientException {
1031            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1032                Reader reader = new InputStreamReader(conn.getInputStream());
1033                JSONObject json = (JSONObject) JSONValue.parse(reader);
1034                return JsonToBean.createBundleJob(json);
1035            }
1036            else {
1037                handleError(conn);
1038            }
1039            return null;
1040        }
1041    }
1042
1043    private class CoordActionInfo extends ClientCallable<CoordinatorAction> {
1044        CoordActionInfo(String actionId) {
1045            super("GET", RestConstants.JOB, notEmpty(actionId, "id"), prepareParams(RestConstants.JOB_SHOW_PARAM,
1046                    RestConstants.JOB_SHOW_INFO));
1047        }
1048
1049        @Override
1050        protected CoordinatorAction call(HttpURLConnection conn) throws IOException, OozieClientException {
1051            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1052                Reader reader = new InputStreamReader(conn.getInputStream());
1053                JSONObject json = (JSONObject) JSONValue.parse(reader);
1054                return JsonToBean.createCoordinatorAction(json);
1055            }
1056            else {
1057                handleError(conn);
1058            }
1059            return null;
1060        }
1061    }
1062
1063    /**
1064     * Get the info of a bundle job.
1065     *
1066     * @param jobId job Id.
1067     * @return the job info.
1068     * @throws OozieClientException thrown if the job info could not be retrieved.
1069     */
1070    public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
1071        return new BundleJobInfo(jobId).call();
1072    }
1073
1074    /**
1075     * Get the info of a coordinator job.
1076     *
1077     * @param jobId job Id.
1078     * @return the job info.
1079     * @throws OozieClientException thrown if the job info could not be retrieved.
1080     */
1081    public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException {
1082        return new CoordJobInfo(jobId, null, -1, -1).call();
1083    }
1084
1085    /**
1086     * Get the info of a coordinator job and subset actions.
1087     *
1088     * @param jobId job Id.
1089     * @param filter filter the status filter
1090     * @param start starting index in the list of actions belonging to the job
1091     * @param len number of actions to be returned
1092     * @return the job info.
1093     * @throws OozieClientException thrown if the job info could not be retrieved.
1094     */
1095    public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) throws OozieClientException {
1096        return new CoordJobInfo(jobId, filter, start, len).call();
1097    }
1098
1099    /**
1100     * Get the info of a coordinator action.
1101     *
1102     * @param actionId Id.
1103     * @return the coordinator action info.
1104     * @throws OozieClientException thrown if the job info could not be retrieved.
1105     */
1106    public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException {
1107        return new CoordActionInfo(actionId).call();
1108    }
1109
1110    private class JobsStatus extends ClientCallable<List<WorkflowJob>> {
1111
1112        JobsStatus(String filter, int start, int len) {
1113            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1114                    RestConstants.JOBTYPE_PARAM, "wf", RestConstants.OFFSET_PARAM, Integer.toString(start),
1115                    RestConstants.LEN_PARAM, Integer.toString(len)));
1116        }
1117
1118        @Override
1119        @SuppressWarnings("unchecked")
1120        protected List<WorkflowJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1121            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1122            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1123                Reader reader = new InputStreamReader(conn.getInputStream());
1124                JSONObject json = (JSONObject) JSONValue.parse(reader);
1125                JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS);
1126                if (workflows == null) {
1127                    workflows = new JSONArray();
1128                }
1129                return JsonToBean.createWorkflowJobList(workflows);
1130            }
1131            else {
1132                handleError(conn);
1133            }
1134            return null;
1135        }
1136    }
1137
1138    private class CoordJobsStatus extends ClientCallable<List<CoordinatorJob>> {
1139
1140        CoordJobsStatus(String filter, int start, int len) {
1141            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1142                    RestConstants.JOBTYPE_PARAM, "coord", RestConstants.OFFSET_PARAM, Integer.toString(start),
1143                    RestConstants.LEN_PARAM, Integer.toString(len)));
1144        }
1145
1146        @Override
1147        protected List<CoordinatorJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1148            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1149            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1150                Reader reader = new InputStreamReader(conn.getInputStream());
1151                JSONObject json = (JSONObject) JSONValue.parse(reader);
1152                JSONArray jobs = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS);
1153                if (jobs == null) {
1154                    jobs = new JSONArray();
1155                }
1156                return JsonToBean.createCoordinatorJobList(jobs);
1157            }
1158            else {
1159                handleError(conn);
1160            }
1161            return null;
1162        }
1163    }
1164
1165    private class BundleJobsStatus extends ClientCallable<List<BundleJob>> {
1166
1167        BundleJobsStatus(String filter, int start, int len) {
1168            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_FILTER_PARAM, filter,
1169                    RestConstants.JOBTYPE_PARAM, "bundle", RestConstants.OFFSET_PARAM, Integer.toString(start),
1170                    RestConstants.LEN_PARAM, Integer.toString(len)));
1171        }
1172
1173        @Override
1174        protected List<BundleJob> call(HttpURLConnection conn) throws IOException, OozieClientException {
1175            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1176            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1177                Reader reader = new InputStreamReader(conn.getInputStream());
1178                JSONObject json = (JSONObject) JSONValue.parse(reader);
1179                JSONArray jobs = (JSONArray) json.get(JsonTags.BUNDLE_JOBS);
1180                if (jobs == null) {
1181                    jobs = new JSONArray();
1182                }
1183                return JsonToBean.createBundleJobList(jobs);
1184            }
1185            else {
1186                handleError(conn);
1187            }
1188            return null;
1189        }
1190    }
1191
1192    private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> {
1193
1194        BulkResponseStatus(String filter, int start, int len) {
1195            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter,
1196                    RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len)));
1197        }
1198
1199        @Override
1200        protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException {
1201            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1202            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1203                Reader reader = new InputStreamReader(conn.getInputStream());
1204                JSONObject json = (JSONObject) JSONValue.parse(reader);
1205                JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
1206                if (results == null) {
1207                    results = new JSONArray();
1208                }
1209                return JsonToBean.createBulkResponseList(results);
1210            }
1211            else {
1212                handleError(conn);
1213            }
1214            return null;
1215        }
1216    }
1217
1218    private class CoordRerun extends ClientCallable<List<CoordinatorAction>> {
1219
1220        CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
1221            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1222                    RestConstants.JOB_COORD_ACTION_RERUN, RestConstants.JOB_COORD_RERUN_TYPE_PARAM, rerunType,
1223                    RestConstants.JOB_COORD_RERUN_SCOPE_PARAM, scope, RestConstants.JOB_COORD_RERUN_REFRESH_PARAM,
1224                    Boolean.toString(refresh), RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean
1225                            .toString(noCleanup)));
1226        }
1227
1228        @Override
1229        protected List<CoordinatorAction> call(HttpURLConnection conn) throws IOException, OozieClientException {
1230            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1231            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1232                Reader reader = new InputStreamReader(conn.getInputStream());
1233                JSONObject json = (JSONObject) JSONValue.parse(reader);
1234                JSONArray coordActions = (JSONArray) json.get(JsonTags.COORDINATOR_ACTIONS);
1235                return JsonToBean.createCoordinatorActionList(coordActions);
1236            }
1237            else {
1238                handleError(conn);
1239            }
1240            return null;
1241        }
1242    }
1243
1244    private class BundleRerun extends ClientCallable<Void> {
1245
1246        BundleRerun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) {
1247            super("PUT", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.ACTION_PARAM,
1248                    RestConstants.JOB_BUNDLE_ACTION_RERUN, RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM,
1249                    coordScope, RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM, dateScope,
1250                    RestConstants.JOB_COORD_RERUN_REFRESH_PARAM, Boolean.toString(refresh),
1251                    RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM, Boolean.toString(noCleanup)));
1252        }
1253
1254        @Override
1255        protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1256            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1257            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1258                return null;
1259            }
1260            else {
1261                handleError(conn);
1262            }
1263            return null;
1264        }
1265    }
1266
1267    /**
1268     * Rerun coordinator actions.
1269     *
1270     * @param jobId coordinator jobId
1271     * @param rerunType rerun type 'date' if -date is used, 'action-id' if -action is used
1272     * @param scope rerun scope for date or actionIds
1273     * @param refresh true if -refresh is given in command option
1274     * @param noCleanup true if -nocleanup is given in command option
1275     * @throws OozieClientException
1276     */
1277    public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh,
1278            boolean noCleanup) throws OozieClientException {
1279        return new CoordRerun(jobId, rerunType, scope, refresh, noCleanup).call();
1280    }
1281
1282    /**
1283     * Rerun bundle coordinators.
1284     *
1285     * @param jobId bundle jobId
1286     * @param coordScope rerun scope for coordinator jobs
1287     * @param dateScope rerun scope for date
1288     * @param refresh true if -refresh is given in command option
1289     * @param noCleanup true if -nocleanup is given in command option
1290     * @throws OozieClientException
1291     */
1292    public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
1293            throws OozieClientException {
1294        return new BundleRerun(jobId, coordScope, dateScope, refresh, noCleanup).call();
1295    }
1296
1297    /**
1298     * Return the info of the workflow jobs that match the filter.
1299     *
1300     * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1301     * @param start jobs offset, base 1.
1302     * @param len number of jobs to return.
1303     * @return a list with the workflow jobs info, without node details.
1304     * @throws OozieClientException thrown if the jobs info could not be retrieved.
1305     */
1306    public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException {
1307        return new JobsStatus(filter, start, len).call();
1308    }
1309
1310    /**
1311     * Return the info of the workflow jobs that match the filter.
1312     * <p/>
1313     * It returns the first 100 jobs that match the filter.
1314     *
1315     * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1316     * @return a list with the workflow jobs info, without node details.
1317     * @throws OozieClientException thrown if the jobs info could not be retrieved.
1318     */
1319    public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException {
1320        return getJobsInfo(filter, 1, 50);
1321    }
1322
1323    /**
1324     * Print sla info about coordinator and workflow jobs and actions.
1325     *
1326     * @param start starting offset
1327     * @param len number of results
1328     * @throws OozieClientException
1329     */
1330    public void getSlaInfo(int start, int len, String filter) throws OozieClientException {
1331        new SlaInfo(start, len, filter).call();
1332    }
1333
1334    private class SlaInfo extends ClientCallable<Void> {
1335
1336        SlaInfo(int start, int len, String filter) {
1337            super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID,
1338                    Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len),
1339                    RestConstants.JOBS_FILTER_PARAM, filter));
1340        }
1341
1342        @Override
1343        @SuppressWarnings("unchecked")
1344        protected Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1345            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
1346            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1347                BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
1348                String line = null;
1349                while ((line = br.readLine()) != null) {
1350                    System.out.println(line);
1351                }
1352            }
1353            else {
1354                handleError(conn);
1355            }
1356            return null;
1357        }
1358    }
1359
1360    private class JobIdAction extends ClientCallable<String> {
1361
1362        JobIdAction(String externalId) {
1363            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBTYPE_PARAM, "wf",
1364                    RestConstants.JOBS_EXTERNAL_ID_PARAM, externalId));
1365        }
1366
1367        @Override
1368        @SuppressWarnings("unchecked")
1369        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1370            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1371                Reader reader = new InputStreamReader(conn.getInputStream());
1372                JSONObject json = (JSONObject) JSONValue.parse(reader);
1373                return (String) json.get(JsonTags.JOB_ID);
1374            }
1375            else {
1376                handleError(conn);
1377            }
1378            return null;
1379        }
1380    }
1381
1382    /**
1383     * Return the workflow job Id for an external Id.
1384     * <p/>
1385     * The external Id must have provided at job creation time.
1386     *
1387     * @param externalId external Id given at job creation time.
1388     * @return the workflow job Id for an external Id, <code>null</code> if none.
1389     * @throws OozieClientException thrown if the operation could not be done.
1390     */
1391    public String getJobId(String externalId) throws OozieClientException {
1392        return new JobIdAction(externalId).call();
1393    }
1394
1395    private class SetSystemMode extends ClientCallable<Void> {
1396
1397        public SetSystemMode(SYSTEM_MODE status) {
1398            super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams(
1399                    RestConstants.ADMIN_SYSTEM_MODE_PARAM, status + ""));
1400        }
1401
1402        @Override
1403        public Void call(HttpURLConnection conn) throws IOException, OozieClientException {
1404            if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
1405                handleError(conn);
1406            }
1407            return null;
1408        }
1409    }
1410
1411    /**
1412     * Enable or disable safe mode. Used by OozieCLI. In safe mode, Oozie would not accept any commands except status
1413     * command to change and view the safe mode status.
1414     *
1415     * @param status true to enable safe mode, false to disable safe mode.
1416     * @throws OozieClientException if it fails to set the safe mode status.
1417     */
1418    public void setSystemMode(SYSTEM_MODE status) throws OozieClientException {
1419        new SetSystemMode(status).call();
1420    }
1421
1422    private class GetSystemMode extends ClientCallable<SYSTEM_MODE> {
1423
1424        GetSystemMode() {
1425            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_STATUS_RESOURCE, prepareParams());
1426        }
1427
1428        @Override
1429        protected SYSTEM_MODE call(HttpURLConnection conn) throws IOException, OozieClientException {
1430            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1431                Reader reader = new InputStreamReader(conn.getInputStream());
1432                JSONObject json = (JSONObject) JSONValue.parse(reader);
1433                return SYSTEM_MODE.valueOf((String) json.get(JsonTags.OOZIE_SYSTEM_MODE));
1434            }
1435            else {
1436                handleError(conn);
1437            }
1438            return SYSTEM_MODE.NORMAL;
1439        }
1440    }
1441
1442    /**
1443     * Returns if Oozie is in safe mode or not.
1444     *
1445     * @return true if safe mode is ON<br>
1446     *         false if safe mode is OFF
1447     * @throws OozieClientException throw if it could not obtain the safe mode status.
1448     */
1449    /*
1450     * public boolean isInSafeMode() throws OozieClientException { return new GetSafeMode().call(); }
1451     */
1452    public SYSTEM_MODE getSystemMode() throws OozieClientException {
1453        return new GetSystemMode().call();
1454    }
1455
1456    private class GetBuildVersion extends ClientCallable<String> {
1457
1458        GetBuildVersion() {
1459            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_BUILD_VERSION_RESOURCE, prepareParams());
1460        }
1461
1462        @Override
1463        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
1464            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1465                Reader reader = new InputStreamReader(conn.getInputStream());
1466                JSONObject json = (JSONObject) JSONValue.parse(reader);
1467                return (String) json.get(JsonTags.BUILD_VERSION);
1468            }
1469            else {
1470                handleError(conn);
1471            }
1472            return null;
1473        }
1474    }
1475
1476    /**
1477     * Return the Oozie server build version.
1478     *
1479     * @return the Oozie server build version.
1480     * @throws OozieClientException throw if it the server build version could not be retrieved.
1481     */
1482    public String getServerBuildVersion() throws OozieClientException {
1483        return new GetBuildVersion().call();
1484    }
1485
1486    /**
1487     * Return the Oozie client build version.
1488     *
1489     * @return the Oozie client build version.
1490     */
1491    public String getClientBuildVersion() {
1492        return BuildInfo.getBuildInfo().getProperty(BuildInfo.BUILD_VERSION);
1493    }
1494
1495    /**
1496     * Return the info of the coordinator jobs that match the filter.
1497     *
1498     * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1499     * @param start jobs offset, base 1.
1500     * @param len number of jobs to return.
1501     * @return a list with the coordinator jobs info
1502     * @throws OozieClientException thrown if the jobs info could not be retrieved.
1503     */
1504    public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException {
1505        return new CoordJobsStatus(filter, start, len).call();
1506    }
1507
1508    /**
1509     * Return the info of the bundle jobs that match the filter.
1510     *
1511     * @param filter job filter. Refer to the {@link OozieClient} for the filter syntax.
1512     * @param start jobs offset, base 1.
1513     * @param len number of jobs to return.
1514     * @return a list with the bundle jobs info
1515     * @throws OozieClientException thrown if the jobs info could not be retrieved.
1516     */
1517    public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException {
1518        return new BundleJobsStatus(filter, start, len).call();
1519    }
1520
1521    public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException {
1522        return new BulkResponseStatus(filter, start, len).call();
1523    }
1524
1525    private class GetQueueDump extends ClientCallable<List<String>> {
1526        GetQueueDump() {
1527            super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams());
1528        }
1529
1530        @Override
1531        protected List<String> call(HttpURLConnection conn) throws IOException, OozieClientException {
1532            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
1533                Reader reader = new InputStreamReader(conn.getInputStream());
1534                JSONObject json = (JSONObject) JSONValue.parse(reader);
1535                JSONArray queueDumpArray = (JSONArray) json.get(JsonTags.QUEUE_DUMP);
1536
1537                List<String> list = new ArrayList<String>();
1538                list.add("[Server Queue Dump]:");
1539                for (Object o : queueDumpArray) {
1540                    JSONObject entry = (JSONObject) o;
1541                    if (entry.get(JsonTags.CALLABLE_DUMP) != null) {
1542                        String value = (String) entry.get(JsonTags.CALLABLE_DUMP);
1543                        list.add(value);
1544                    }
1545                }
1546                if (queueDumpArray.size() == 0) {
1547                    list.add("Queue dump is null!");
1548                }
1549
1550                list.add("******************************************");
1551                list.add("[Server Uniqueness Map Dump]:");
1552
1553                JSONArray uniqueDumpArray = (JSONArray) json.get(JsonTags.UNIQUE_MAP_DUMP);
1554                for (Object o : uniqueDumpArray) {
1555                    JSONObject entry = (JSONObject) o;
1556                    if (entry.get(JsonTags.UNIQUE_ENTRY_DUMP) != null) {
1557                        String value = (String) entry.get(JsonTags.UNIQUE_ENTRY_DUMP);
1558                        list.add(value);
1559                    }
1560                }
1561                if (uniqueDumpArray.size() == 0) {
1562                    list.add("Uniqueness dump is null!");
1563                }
1564                return list;
1565            }
1566            else {
1567                handleError(conn);
1568            }
1569            return null;
1570        }
1571    }
1572
1573    /**
1574     * Return the Oozie queue's commands' dump
1575     *
1576     * @return the list of strings of callable identification in queue
1577     * @throws OozieClientException throw if it the queue dump could not be retrieved.
1578     */
1579    public List<String> getQueueDump() throws OozieClientException {
1580        return new GetQueueDump().call();
1581    }
1582
1583    /**
1584     * Check if the string is not null or not empty.
1585     *
1586     * @param str
1587     * @param name
1588     * @return string
1589     */
1590    public static String notEmpty(String str, String name) {
1591        if (str == null) {
1592            throw new IllegalArgumentException(name + " cannot be null");
1593        }
1594        if (str.length() == 0) {
1595            throw new IllegalArgumentException(name + " cannot be empty");
1596        }
1597        return str;
1598    }
1599
1600    /**
1601     * Check if the object is not null.
1602     *
1603     * @param <T>
1604     * @param obj
1605     * @param name
1606     * @return string
1607     */
1608    public static <T> T notNull(T obj, String name) {
1609        if (obj == null) {
1610            throw new IllegalArgumentException(name + " cannot be null");
1611        }
1612        return obj;
1613    }
1614
1615}