001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.servlet;
019
020import java.io.IOException;
021import java.util.List;
022import javax.servlet.ServletInputStream;
023import javax.servlet.http.HttpServletRequest;
024import javax.servlet.http.HttpServletResponse;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.*;
027import org.apache.oozie.client.WorkflowAction;
028import org.apache.oozie.client.WorkflowJob;
029import org.apache.oozie.client.rest.*;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.coord.CoordRerunXCommand;
032import org.apache.oozie.service.BundleEngineService;
033import org.apache.oozie.service.CoordinatorEngineService;
034import org.apache.oozie.service.DagEngineService;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.util.GraphGenerator;
037import org.apache.oozie.util.XLog;
038import org.json.simple.JSONObject;
039
040
041@SuppressWarnings("serial")
042public class V1JobServlet extends BaseJobServlet {
043
044    private static final String INSTRUMENTATION_NAME = "v1job";
045    public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length";
046
047    public V1JobServlet() {
048        super(INSTRUMENTATION_NAME);
049    }
050
051    protected V1JobServlet(String instrumentation_name){
052        super(instrumentation_name);
053    }
054
055    /*
056     * protected method to start a job
057     */
058    @Override
059    protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
060            IOException {
061        /*
062         * Configuration conf = new XConfiguration(request.getInputStream());
063         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
064         * conf.get(OozieClient.COORDINATOR_APP_PATH);
065         *
066         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
067         */
068        String jobId = getResourceName(request);
069        if (jobId.endsWith("-W")) {
070            startWorkflowJob(request, response);
071        }
072        else if (jobId.endsWith("-B")) {
073            startBundleJob(request, response);
074        }
075        else {
076            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START);
077        }
078
079    }
080
081    /*
082     * protected method to resume a job
083     */
084    @Override
085    protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
086            IOException {
087        /*
088         * Configuration conf = new XConfiguration(request.getInputStream());
089         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
090         * conf.get(OozieClient.COORDINATOR_APP_PATH);
091         *
092         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
093         */
094        String jobId = getResourceName(request);
095        if (jobId.endsWith("-W")) {
096            resumeWorkflowJob(request, response);
097        }
098        else if (jobId.endsWith("-B")) {
099            resumeBundleJob(request, response);
100        }
101        else {
102            resumeCoordinatorJob(request, response);
103        }
104    }
105
106    /*
107     * protected method to suspend a job
108     */
109    @Override
110    protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
111            IOException {
112        /*
113         * Configuration conf = new XConfiguration(request.getInputStream());
114         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
115         * conf.get(OozieClient.COORDINATOR_APP_PATH);
116         *
117         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
118         */
119        String jobId = getResourceName(request);
120        if (jobId.endsWith("-W")) {
121            suspendWorkflowJob(request, response);
122        }
123        else if (jobId.endsWith("-B")) {
124            suspendBundleJob(request, response);
125        }
126        else {
127            suspendCoordinatorJob(request, response);
128        }
129    }
130
131    /*
132     * protected method to kill a job
133     */
134    @Override
135    protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
136            IOException {
137        /*
138         * Configuration conf = new XConfiguration(request.getInputStream());
139         * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath =
140         * conf.get(OozieClient.COORDINATOR_APP_PATH);
141         *
142         * ServletUtilities.ValidateAppPath(wfPath, coordPath);
143         */
144        String jobId = getResourceName(request);
145        if (jobId.endsWith("-W")) {
146            killWorkflowJob(request, response);
147        }
148        else if (jobId.endsWith("-B")) {
149            killBundleJob(request, response);
150        }
151        else {
152            killCoordinatorJob(request, response);
153        }
154    }
155
156    /**
157     * protected method to change a coordinator job
158     * @param request request object
159     * @param response response object
160     * @throws XServletException
161     * @throws IOException
162     */
163    @Override
164    protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
165            IOException {
166        String jobId = getResourceName(request);
167        if (jobId.endsWith("-B")) {
168            changeBundleJob(request, response);
169        }
170        else {
171            changeCoordinatorJob(request, response);
172        }
173    }
174
175    /*
176     * protected method to reRun a job
177     *
178     * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http.
179     * HttpServletRequest, javax.servlet.http.HttpServletResponse,
180     * org.apache.hadoop.conf.Configuration)
181     */
182    @Override
183    protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
184            throws XServletException, IOException {
185        JSONObject json = null;
186        String jobId = getResourceName(request);
187        if (jobId.endsWith("-W")) {
188            reRunWorkflowJob(request, response, conf);
189        }
190        else if (jobId.endsWith("-B")) {
191            rerunBundleJob(request, response, conf);
192        }
193        else {
194            json = reRunCoordinatorActions(request, response, conf);
195        }
196        return json;
197    }
198
199    /*
200     * protected method to get a job in JsonBean representation
201     */
202    @Override
203    protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
204            IOException, BaseEngineException {
205        ServletInputStream is = request.getInputStream();
206        byte[] b = new byte[101];
207        while (is.readLine(b, 0, 100) != -1) {
208            XLog.getLog(getClass()).warn("Printing :" + new String(b));
209        }
210
211        JsonBean jobBean = null;
212        String jobId = getResourceName(request);
213        if (jobId.endsWith("-B")) {
214            jobBean = getBundleJob(request, response);
215        }
216        else {
217            if (jobId.endsWith("-W")) {
218                jobBean = getWorkflowJob(request, response);
219            }
220            else {
221                if (jobId.contains("-W@")) {
222                    jobBean = getWorkflowAction(request, response);
223                }
224                else {
225                    if (jobId.contains("-C@")) {
226                        jobBean = getCoordinatorAction(request, response);
227                    }
228                    else {
229                        jobBean = getCoordinatorJob(request, response);
230                    }
231                }
232            }
233        }
234
235        return jobBean;
236    }
237
238    /*
239     * protected method to get a job definition in String format
240     */
241    @Override
242    protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
243            throws XServletException, IOException {
244        String jobDefinition = null;
245        String jobId = getResourceName(request);
246        if (jobId.endsWith("-W")) {
247            jobDefinition = getWorkflowJobDefinition(request, response);
248        }
249        else if (jobId.endsWith("-B")) {
250            jobDefinition = getBundleJobDefinition(request, response);
251        }
252        else {
253            jobDefinition = getCoordinatorJobDefinition(request, response);
254        }
255        return jobDefinition;
256    }
257
258    /*
259     * protected method to stream a job log into response object
260     */
261    @Override
262    protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
263            IOException {
264        String jobId = getResourceName(request);
265        if (jobId.endsWith("-W")) {
266            streamWorkflowJobLog(request, response);
267        }
268        else if (jobId.endsWith("-B")) {
269            streamBundleJob(request, response);
270        }
271        else {
272            streamCoordinatorJobLog(request, response);
273        }
274    }
275
276    @Override
277    protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
278            throws XServletException, IOException {
279        String jobId = getResourceName(request);
280        if (jobId.endsWith("-W")) {
281            // Applicable only to worflow, for now
282            response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE);
283            try {
284                String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM);
285                boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true"));
286
287                new GraphGenerator(
288                        getWorkflowJobDefinition(request, response),
289                        (JsonWorkflowJob)getWorkflowJob(request, response),
290                        sK).write(response.getOutputStream());
291            }
292            catch (Exception e) {
293                throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ErrorCode.E0307, e.getMessage(), e);
294            }
295        }
296        else {
297            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306);
298        }
299    }
300
301    /**
302     * Start wf job
303     *
304     * @param request servlet request
305     * @param response servlet response
306     * @throws XServletException
307     */
308    private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
309        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
310
311        String jobId = getResourceName(request);
312        try {
313            dagEngine.start(jobId);
314        }
315        catch (DagEngineException ex) {
316            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
317        }
318    }
319
320    /**
321     * Start bundle job
322     *
323     * @param request servlet request
324     * @param response servlet response
325     * @throws XServletException
326     */
327    private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
328        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
329        String jobId = getResourceName(request);
330        try {
331            bundleEngine.start(jobId);
332        }
333        catch (BundleEngineException ex) {
334            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
335        }
336    }
337
338    /**
339     * Resume workflow job
340     *
341     * @param request servlet request
342     * @param response servlet response
343     * @throws XServletException
344     */
345    private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
346        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
347
348        String jobId = getResourceName(request);
349        try {
350            dagEngine.resume(jobId);
351        }
352        catch (DagEngineException ex) {
353            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
354        }
355    }
356
357    /**
358     * Resume bundle job
359     *
360     * @param request servlet request
361     * @param response servlet response
362     * @throws XServletException
363     */
364    private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
365        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
366        String jobId = getResourceName(request);
367        try {
368            bundleEngine.resume(jobId);
369        }
370        catch (BundleEngineException ex) {
371            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
372        }
373    }
374
375    /**
376     * Resume coordinator job
377     *
378     * @param request servlet request
379     * @param response servlet response
380     * @throws XServletException
381     * @throws CoordinatorEngineException
382     */
383    private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
384            throws XServletException {
385        String jobId = getResourceName(request);
386        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
387                getUser(request));
388        try {
389            coordEngine.resume(jobId);
390        }
391        catch (CoordinatorEngineException ex) {
392            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
393        }
394    }
395
396    /**
397     * Suspend a wf job
398     *
399     * @param request servlet request
400     * @param response servlet response
401     * @throws XServletException
402     */
403    private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
404        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
405
406        String jobId = getResourceName(request);
407        try {
408            dagEngine.suspend(jobId);
409        }
410        catch (DagEngineException ex) {
411            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
412        }
413    }
414
415    /**
416     * Suspend bundle job
417     *
418     * @param request servlet request
419     * @param response servlet response
420     * @throws XServletException
421     */
422    private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
423        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
424        String jobId = getResourceName(request);
425        try {
426            bundleEngine.suspend(jobId);
427        }
428        catch (BundleEngineException ex) {
429            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
430        }
431    }
432
433    /**
434     * Suspend coordinator job
435     *
436     * @param request servlet request
437     * @param response servlet response
438     * @throws XServletException
439     */
440    private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
441            throws XServletException {
442        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
443                getUser(request));
444        String jobId = getResourceName(request);
445        try {
446            coordEngine.suspend(jobId);
447        }
448        catch (CoordinatorEngineException ex) {
449            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
450        }
451    }
452
453    /**
454     * Kill a wf job
455     * @param request servlet request
456     * @param response servlet response
457     * @throws XServletException
458     */
459    private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
460        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
461
462        String jobId = getResourceName(request);
463        try {
464            dagEngine.kill(jobId);
465        }
466        catch (DagEngineException ex) {
467            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
468        }
469    }
470
471    /**
472     * Kill a coord job
473     * @param request servlet request
474     * @param response servlet response
475     * @throws XServletException
476     */
477    private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
478        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
479                getUser(request));
480        String jobId = getResourceName(request);
481        try {
482            coordEngine.kill(jobId);
483        }
484        catch (CoordinatorEngineException ex) {
485            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
486        }
487    }
488
489    /**
490     * Kill bundle job
491     *
492     * @param request servlet request
493     * @param response servlet response
494     * @throws XServletException
495     */
496    private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
497        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
498        String jobId = getResourceName(request);
499        try {
500            bundleEngine.kill(jobId);
501        }
502        catch (BundleEngineException ex) {
503            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
504        }
505    }
506
507    /**
508     * Change a coordinator job
509     *
510     * @param request servlet request
511     * @param response servlet response
512     * @throws XServletException
513     */
514    private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
515            throws XServletException {
516        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
517                getUser(request));
518        String jobId = getResourceName(request);
519        String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
520        try {
521            coordEngine.change(jobId, changeValue);
522        }
523        catch (CoordinatorEngineException ex) {
524            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
525        }
526    }
527
528    /**
529     * Change a bundle job
530     *
531     * @param request servlet request
532     * @param response servlet response
533     * @throws XServletException
534     */
535    private void changeBundleJob(HttpServletRequest request, HttpServletResponse response)
536            throws XServletException {
537        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
538        String jobId = getResourceName(request);
539        String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE);
540        try {
541            bundleEngine.change(jobId, changeValue);
542        }
543        catch (BundleEngineException ex) {
544            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
545        }
546    }
547
548    /**
549     * Rerun a wf job
550     *
551     * @param request servlet request
552     * @param response servlet response
553     * @param conf configuration object
554     * @throws XServletException
555     */
556    private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
557            throws XServletException {
558        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
559
560        String jobId = getResourceName(request);
561        try {
562            dagEngine.reRun(jobId, conf);
563        }
564        catch (DagEngineException ex) {
565            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
566        }
567    }
568
569    /**
570     * Rerun bundle job
571     *
572     * @param request servlet request
573     * @param response servlet response
574     * @param conf configration object
575     * @throws XServletException
576     */
577    private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
578            throws XServletException {
579        JSONObject json = new JSONObject();
580        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
581        String jobId = getResourceName(request);
582
583        String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM);
584        String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM);
585        String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
586        String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
587
588        XLog.getLog(getClass()).info(
589                "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh="
590                        + refresh + ", noCleanup=" + noCleanup);
591
592        try {
593            bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup));
594        }
595        catch (BaseEngineException ex) {
596            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
597        }
598    }
599
600    /**
601     * Rerun coordinator actions
602     *
603     * @param request servlet request
604     * @param response servlet response
605     * @param conf configuration object
606     * @throws XServletException
607     */
608    @SuppressWarnings("unchecked")
609    private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response,
610            Configuration conf) throws XServletException {
611        JSONObject json = new JSONObject();
612        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request));
613
614        String jobId = getResourceName(request);
615
616        String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM);
617        String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM);
618        String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM);
619        String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM);
620
621        XLog.getLog(getClass()).info(
622                "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh="
623                        + refresh + ", noCleanup=" + noCleanup);
624
625        try {
626            if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType
627                    .equals(RestConstants.JOB_COORD_RERUN_ACTION))) {
628                throw new CommandException(ErrorCode.E1018, "date or action expected.");
629            }
630            CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh),
631                    Boolean.valueOf(noCleanup));
632            List<CoordinatorActionBean> coordActions;
633            if (coordInfo != null) {
634                coordActions = coordInfo.getCoordActions();
635            }
636            else {
637                coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope);
638            }
639            json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT"));
640        }
641        catch (BaseEngineException ex) {
642            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
643        }
644        catch (CommandException ex) {
645            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
646        }
647
648        return json;
649    }
650
651
652
653    /**
654     * Get workflow job
655     *
656     * @param request servlet request
657     * @param response servlet response
658     * @return JsonBean WorkflowJobBean
659     * @throws XServletException
660     */
661    protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException {
662        JsonBean jobBean = getWorkflowJobBean(request, response);
663        // for backward compatibility (OOZIE-1231)
664        swapMRActionID((WorkflowJob)jobBean);
665        return jobBean;
666    }
667
668    /**
669     * Get workflow job
670     *
671     * @param request servlet request
672     * @param response servlet response
673     * @return JsonBean WorkflowJobBean
674     * @throws XServletException
675     */
676    protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException {
677        JsonBean jobBean = null;
678        String jobId = getResourceName(request);
679        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
680        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
681        int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
682        start = (start < 1) ? 1 : start;
683        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
684        len = (len < 1) ? Integer.MAX_VALUE : len;
685        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
686        try {
687            jobBean = (JsonBean) dagEngine.getJob(jobId, start, len);
688        }
689        catch (DagEngineException ex) {
690            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
691        }
692        return jobBean;
693    }
694
695    private void swapMRActionID(WorkflowJob wjBean) {
696        List<WorkflowAction> actions = wjBean.getActions();
697        if (actions != null) {
698            for (WorkflowAction wa : actions) {
699                swapMRActionID(wa);
700            }
701        }
702    }
703
704    private void swapMRActionID(WorkflowAction waBean) {
705        if (waBean.getType().equals("map-reduce")) {
706            String childId = waBean.getExternalChildIDs();
707            if (childId != null && !childId.equals("")) {
708                String consoleBase = getConsoleBase(waBean.getConsoleUrl());
709                ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId);
710                ((WorkflowActionBean) waBean).setExternalId(childId);
711                ((WorkflowActionBean) waBean).setExternalChildIDs("");
712            }
713        }
714    }
715
716    private String getConsoleBase(String url) {
717        String consoleBase = null;
718        if (url.indexOf("application") != -1) {
719            consoleBase = url.split("application_[0-9]+_[0-9]+")[0];
720        }
721        else {
722            consoleBase = url.split("job_[0-9]+_[0-9]+")[0];
723        }
724        return consoleBase;
725    }
726
727    /**
728     * Get wf action info
729     *
730     * @param request servlet request
731     * @param response servlet response
732     * @return JsonBean WorkflowActionBean
733     * @throws XServletException
734     */
735    protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response)
736            throws XServletException {
737
738        JsonBean actionBean = getWorkflowActionBean(request, response);
739        // for backward compatibility (OOZIE-1231)
740        swapMRActionID((WorkflowAction)actionBean);
741        return actionBean;
742    }
743
744    protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response)
745            throws XServletException {
746        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
747
748        JsonBean actionBean = null;
749        String actionId = getResourceName(request);
750        try {
751            actionBean = dagEngine.getWorkflowAction(actionId);
752        }
753        catch (BaseEngineException ex) {
754            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
755        }
756        return actionBean;
757    }
758
759    /**
760     * Get coord job info
761     *
762     * @param request servlet request
763     * @param response servlet response
764     * @return JsonBean CoordinatorJobBean
765     * @throws XServletException
766     * @throws BaseEngineException
767     */
768    protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response)
769            throws XServletException, BaseEngineException {
770        JsonBean jobBean = null;
771        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
772                getUser(request));
773        String jobId = getResourceName(request);
774        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
775        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
776        String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
777        String orderStr = request.getParameter(RestConstants.ORDER_PARAM);
778        boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false;
779        int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
780        start = (start < 1) ? 1 : start;
781        // Get default number of coordinator actions to be retrieved
782        int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
783        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
784        len = getCoordinatorJobLength(defaultLen, len);
785        try {
786            JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len, order);
787            jobBean = coordJob;
788        }
789        catch (CoordinatorEngineException ex) {
790            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
791        }
792
793        return jobBean;
794    }
795
796    /**
797     * Given the requested length and the default length, determine how many coordinator jobs to return.
798     * Used by {@link #getCoordinatorJob(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)}
799     *
800     * @param defaultLen The default length
801     * @param len The requested length
802     * @return The length to use
803     */
804    protected int getCoordinatorJobLength(int defaultLen, int len) {
805        return (len < 1) ? defaultLen : len;
806    }
807
808    /**
809     * Get bundle job info
810     *
811     * @param request servlet request
812     * @param response servlet response
813     * @return JsonBean bundle job bean
814     * @throws XServletException
815     * @throws BaseEngineException
816     */
817    private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
818            BaseEngineException {
819        JsonBean jobBean = null;
820        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
821        String jobId = getResourceName(request);
822
823        try {
824            jobBean = (JsonBean) bundleEngine.getBundleJob(jobId);
825
826            return jobBean;
827        }
828        catch (BundleEngineException ex) {
829            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
830        }
831    }
832
833    /**
834     * Get coordinator action
835     *
836     * @param request servlet request
837     * @param response servlet response
838     * @return JsonBean CoordinatorActionBean
839     * @throws XServletException
840     * @throws BaseEngineException
841     */
842    private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response)
843            throws XServletException, BaseEngineException {
844        JsonBean actionBean = null;
845        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
846                getUser(request));
847        String actionId = getResourceName(request);
848        try {
849            actionBean = coordEngine.getCoordAction(actionId);
850        }
851        catch (CoordinatorEngineException ex) {
852            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
853        }
854
855        return actionBean;
856    }
857
858    /**
859     * Get wf job definition
860     *
861     * @param request servlet request
862     * @param response servlet response
863     * @return String wf definition
864     * @throws XServletException
865     */
866    private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response)
867            throws XServletException {
868        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
869
870        String wfDefinition;
871        String jobId = getResourceName(request);
872        try {
873            wfDefinition = dagEngine.getDefinition(jobId);
874        }
875        catch (DagEngineException ex) {
876            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
877        }
878        return wfDefinition;
879    }
880
881    /**
882     * Get bundle job definition
883     *
884     * @param request servlet request
885     * @param response servlet response
886     * @return String bundle definition
887     * @throws XServletException
888     */
889    private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException {
890        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
891        String bundleDefinition;
892        String jobId = getResourceName(request);
893        try {
894            bundleDefinition = bundleEngine.getDefinition(jobId);
895        }
896        catch (BundleEngineException ex) {
897            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
898        }
899        return bundleDefinition;
900    }
901
902    /**
903     * Get coordinator job definition
904     *
905     * @param request servlet request
906     * @param response servlet response
907     * @return String coord definition
908     * @throws XServletException
909     */
910    private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response)
911            throws XServletException {
912
913        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
914                getUser(request));
915
916        String jobId = getResourceName(request);
917
918        String coordDefinition = null;
919        try {
920            coordDefinition = coordEngine.getDefinition(jobId);
921        }
922        catch (BaseEngineException ex) {
923            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
924        }
925        return coordDefinition;
926    }
927
928    /**
929     * Stream wf job log
930     *
931     * @param request servlet request
932     * @param response servlet response
933     * @throws XServletException
934     * @throws IOException
935     */
936    private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response)
937            throws XServletException, IOException {
938        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
939        String jobId = getResourceName(request);
940        try {
941            dagEngine.streamLog(jobId, response.getWriter());
942        }
943        catch (DagEngineException ex) {
944            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
945        }
946    }
947
948    /**
949     * Stream bundle job log
950     *
951     * @param request servlet request
952     * @param response servlet response
953     * @throws XServletException
954     */
955    private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
956            throws XServletException, IOException {
957        BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
958        String jobId = getResourceName(request);
959        try {
960            bundleEngine.streamLog(jobId, response.getWriter());
961        }
962        catch (BundleEngineException ex) {
963            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
964        }
965    }
966
967    /**
968     * Stream coordinator job log
969     *
970     * @param request servlet request
971     * @param response servlet response
972     * @throws XServletException
973     * @throws IOException
974     */
975    private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response)
976            throws XServletException, IOException {
977
978        CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
979                getUser(request));
980        String jobId = getResourceName(request);
981        String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM);
982        String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM);
983        try {
984            coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter());
985        }
986        catch (BaseEngineException ex) {
987            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
988        }
989        catch (CommandException ex) {
990            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
991        }
992    }
993
994    @Override
995    protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException,
996            IOException {
997        throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1");
998    }
999
1000}