001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.servlet;
019
020import java.io.IOException;
021import java.util.HashSet;
022import java.util.List;
023import java.util.Set;
024
025import javax.servlet.http.HttpServletRequest;
026import javax.servlet.http.HttpServletResponse;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.BaseEngineException;
030import org.apache.oozie.BulkResponseInfo;
031import org.apache.oozie.BundleJobBean;
032import org.apache.oozie.BundleJobInfo;
033import org.apache.oozie.CoordinatorEngine;
034import org.apache.oozie.BundleEngine;
035import org.apache.oozie.CoordinatorEngineException;
036import org.apache.oozie.BundleEngineException;
037import org.apache.oozie.CoordinatorJobBean;
038import org.apache.oozie.CoordinatorJobInfo;
039import org.apache.oozie.DagEngine;
040import org.apache.oozie.DagEngineException;
041import org.apache.oozie.ErrorCode;
042import org.apache.oozie.WorkflowJobBean;
043import org.apache.oozie.WorkflowsInfo;
044import org.apache.oozie.cli.OozieCLI;
045import org.apache.oozie.client.OozieClient;
046import org.apache.oozie.client.rest.BulkResponseImpl;
047import org.apache.oozie.client.rest.JsonTags;
048import org.apache.oozie.client.rest.RestConstants;
049import org.apache.oozie.service.CoordinatorEngineService;
050import org.apache.oozie.service.DagEngineService;
051import org.apache.oozie.service.BundleEngineService;
052import org.apache.oozie.service.Services;
053import org.apache.oozie.util.XLog;
054import org.apache.oozie.util.XmlUtils;
055import org.json.simple.JSONObject;
056
057public class V1JobsServlet extends BaseJobsServlet {
058
059    private static final String INSTRUMENTATION_NAME = "v1jobs";
060    private static final Set<String> httpJobType = new HashSet<String>(){{
061        this.add(OozieCLI.HIVE_CMD);
062        this.add(OozieCLI.PIG_CMD);
063        this.add(OozieCLI.MR_CMD);
064    }};
065
066    public V1JobsServlet() {
067        super(INSTRUMENTATION_NAME);
068    }
069
070    /**
071     * v1 service implementation to submit a job, either workflow or coordinator
072     */
073    @Override
074    protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,
075            IOException {
076        JSONObject json = null;
077
078        String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
079
080        if (jobType == null) {
081            String wfPath = conf.get(OozieClient.APP_PATH);
082            String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
083            String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
084
085            ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
086
087            if (wfPath != null) {
088                json = submitWorkflowJob(request, conf);
089            }
090            else if (coordPath != null) {
091                json = submitCoordinatorJob(request, conf);
092            }
093            else {
094                json = submitBundleJob(request, conf);
095            }
096        }
097        else { // This is a http submission job
098            if (httpJobType.contains(jobType)) {
099                json = submitHttpJob(request, conf, jobType);
100            }
101            else {
102                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
103                        RestConstants.JOBTYPE_PARAM, jobType);
104            }
105        }
106        return json;
107    }
108
109    /**
110     * v1 service implementation to get a JSONObject representation of a job from its external ID
111     */
112    @Override
113    protected JSONObject getJobIdForExternalId(HttpServletRequest request, String externalId) throws XServletException,
114            IOException {
115        JSONObject json = null;
116        /*
117         * Configuration conf = new XConfiguration(); String wfPath =
118         * conf.get(OozieClient.APP_PATH); String coordPath =
119         * conf.get(OozieClient.COORDINATOR_APP_PATH);
120         *
121         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
122         */
123        String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
124        jobtype = (jobtype != null) ? jobtype : "wf";
125        if (jobtype.contains("wf")) {
126            json = getWorkflowJobIdForExternalId(request, externalId);
127        }
128        else {
129            json = getCoordinatorJobIdForExternalId(request, externalId);
130        }
131        return json;
132    }
133
134    /**
135     * v1 service implementation to get a list of workflows, coordinators, or bundles, with filtering or interested
136     * windows embedded in the request object
137     */
138    @Override
139    protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
140        JSONObject json = null;
141        String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM);
142        if(isBulk != null) {
143            json = getBulkJobs(request);
144        } else {
145            String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
146            jobtype = (jobtype != null) ? jobtype : "wf";
147
148            if (jobtype.contains("wf")) {
149                json = getWorkflowJobs(request);
150            }
151            else if (jobtype.contains("coord")) {
152                json = getCoordinatorJobs(request);
153            }
154            else if (jobtype.contains("bundle")) {
155                json = getBundleJobs(request);
156            }
157        }
158        return json;
159    }
160
161    /**
162     * v1 service implementation to submit a workflow job
163     */
164    @SuppressWarnings("unchecked")
165    private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
166
167        JSONObject json = new JSONObject();
168
169        try {
170            String action = request.getParameter(RestConstants.ACTION_PARAM);
171            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
172                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
173                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
174                        RestConstants.ACTION_PARAM, action);
175            }
176            boolean startJob = (action != null);
177            String user = conf.get(OozieClient.USER_NAME);
178            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
179            String id;
180            boolean dryrun = false;
181            if (action != null) {
182                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
183            }
184            if (dryrun) {
185                id = dagEngine.dryRunSubmit(conf);
186            }
187            else {
188                id = dagEngine.submitJob(conf, startJob);
189            }
190            json.put(JsonTags.JOB_ID, id);
191        }
192        catch (BaseEngineException ex) {
193            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
194        }
195
196        return json;
197    }
198
199    /**
200     * v1 service implementation to submit a coordinator job
201     */
202    @SuppressWarnings("unchecked")
203    private JSONObject submitCoordinatorJob(HttpServletRequest request, Configuration conf) throws XServletException {
204
205        JSONObject json = new JSONObject();
206        XLog.getLog(getClass()).warn("submitCoordinatorJob " + XmlUtils.prettyPrint(conf).toString());
207        try {
208            String action = request.getParameter(RestConstants.ACTION_PARAM);
209            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
210                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
211                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
212                        RestConstants.ACTION_PARAM, action);
213            }
214            boolean startJob = (action != null);
215            String user = conf.get(OozieClient.USER_NAME);
216            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
217                    user);
218            String id = null;
219            boolean dryrun = false;
220            if (action != null) {
221                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
222            }
223            if (dryrun) {
224                id = coordEngine.dryRunSubmit(conf);
225            }
226            else {
227                id = coordEngine.submitJob(conf, startJob);
228            }
229            json.put(JsonTags.JOB_ID, id);
230        }
231        catch (CoordinatorEngineException ex) {
232            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
233        }
234
235        return json;
236    }
237
238    /**
239     * v1 service implementation to submit a bundle job
240     */
241    @SuppressWarnings("unchecked")
242    private JSONObject submitBundleJob(HttpServletRequest request, Configuration conf) throws XServletException {
243        JSONObject json = new JSONObject();
244        XLog.getLog(getClass()).warn("submitBundleJob " + XmlUtils.prettyPrint(conf).toString());
245        try {
246            String action = request.getParameter(RestConstants.ACTION_PARAM);
247            if (action != null && !action.equals(RestConstants.JOB_ACTION_START)
248                    && !action.equals(RestConstants.JOB_ACTION_DRYRUN)) {
249                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
250                        RestConstants.ACTION_PARAM, action);
251            }
252            boolean startJob = (action != null);
253            String user = conf.get(OozieClient.USER_NAME);
254            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user);
255            String id = null;
256            boolean dryrun = false;
257            if (action != null) {
258                dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
259            }
260            if (dryrun) {
261                id = bundleEngine.dryRunSubmit(conf);
262            }
263            else {
264                id = bundleEngine.submitJob(conf, startJob);
265            }
266            json.put(JsonTags.JOB_ID, id);
267        }
268        catch (BundleEngineException ex) {
269            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
270        }
271
272        return json;
273    }
274
275    /**
276     * v1 service implementation to get a JSONObject representation of a job from its external ID
277     */
278    @SuppressWarnings("unchecked")
279    private JSONObject getWorkflowJobIdForExternalId(HttpServletRequest request, String externalId)
280            throws XServletException {
281        JSONObject json = new JSONObject();
282        try {
283            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
284            String jobId = dagEngine.getJobIdForExternalId(externalId);
285            json.put(JsonTags.JOB_ID, jobId);
286        }
287        catch (DagEngineException ex) {
288            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
289        }
290        return json;
291    }
292
293    /**
294     * v1 service implementation to get a JSONObject representation of a job from its external ID
295     */
296    private JSONObject getCoordinatorJobIdForExternalId(HttpServletRequest request, String externalId)
297            throws XServletException {
298        JSONObject json = new JSONObject();
299        return json;
300    }
301
302    /**
303     * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
304     * request object
305     */
306    private JSONObject getWorkflowJobs(HttpServletRequest request) throws XServletException {
307        JSONObject json = new JSONObject();
308        try {
309            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
310            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
311            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
312            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 
313                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
314            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
315            start = (start < 1) ? 1 : start;
316            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
317            len = (len < 1) ? 50 : len;
318            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
319            WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len);
320            List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows();
321            json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, timeZoneId));
322            json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
323            json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
324            json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
325
326        }
327        catch (DagEngineException ex) {
328            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
329        }
330
331        return json;
332    }
333
334    /**
335     * v1 service implementation to get a list of workflows, with filtering or interested windows embedded in the
336     * request object
337     */
338    @SuppressWarnings("unchecked")
339    private JSONObject getCoordinatorJobs(HttpServletRequest request) throws XServletException {
340        JSONObject json = new JSONObject();
341        try {
342            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
343            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
344            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
345            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 
346                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
347            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
348            start = (start < 1) ? 1 : start;
349            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
350            len = (len < 1) ? 50 : len;
351            CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
352                    getUser(request));
353            CoordinatorJobInfo jobs = coordEngine.getCoordJobs(filter, start, len);
354            List<CoordinatorJobBean> jsonJobs = jobs.getCoordJobs();
355            json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jsonJobs, timeZoneId));
356            json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
357            json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
358            json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
359
360        }
361        catch (CoordinatorEngineException ex) {
362            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
363        }
364        return json;
365    }
366
367    @SuppressWarnings("unchecked")
368    private JSONObject getBundleJobs(HttpServletRequest request) throws XServletException {
369        JSONObject json = new JSONObject();
370        try {
371            String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
372            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
373            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
374            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null 
375                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
376            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
377            start = (start < 1) ? 1 : start;
378            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
379            len = (len < 1) ? 50 : len;
380
381            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
382            BundleJobInfo jobs = bundleEngine.getBundleJobs(filter, start, len);
383            List<BundleJobBean> jsonJobs = jobs.getBundleJobs();
384
385            json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jsonJobs, timeZoneId));
386            json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
387            json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
388            json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
389
390        }
391        catch (BundleEngineException ex) {
392            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
393        }
394        return json;
395    }
396
397    @SuppressWarnings("unchecked")
398    private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException {
399        JSONObject json = new JSONObject();
400        try {
401            String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API
402            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
403            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
404            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
405                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
406            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
407            start = (start < 1) ? 1 : start;
408            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
409            len = (len < 1) ? 50 : len;
410
411            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
412            BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len);
413            List<BulkResponseImpl> responsesToJson = bulkResponse.getResponses();
414
415            json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(responsesToJson, timeZoneId));
416            json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal());
417            json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart());
418            json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen());
419
420        }
421        catch (BaseEngineException ex) {
422            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
423        }
424        return json;
425    }
426
427    /**
428     * service implementation to submit a http job
429     */
430    private JSONObject submitHttpJob(HttpServletRequest request, Configuration conf, String jobType)
431            throws XServletException {
432        JSONObject json = new JSONObject();
433
434        try {
435            String user = conf.get(OozieClient.USER_NAME);
436            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
437            String id = dagEngine.submitHttpJob(conf, jobType);
438            json.put(JsonTags.JOB_ID, id);
439        }
440        catch (DagEngineException ex) {
441            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
442        }
443
444        return json;
445    }
446}