001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.servlet;
019
020import java.io.IOException;
021import java.util.Arrays;
022
023import javax.servlet.ServletException;
024import javax.servlet.http.HttpServletRequest;
025import javax.servlet.http.HttpServletResponse;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.oozie.BaseEngineException;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.client.OozieClient;
031import org.apache.oozie.client.XOozieClient;
032import org.apache.oozie.client.rest.JsonBean;
033import org.apache.oozie.client.rest.JsonTags;
034import org.apache.oozie.client.rest.RestConstants;
035import org.apache.oozie.service.AuthorizationException;
036import org.apache.oozie.service.AuthorizationService;
037import org.apache.oozie.service.Services;
038import org.apache.oozie.service.XLogService;
039import org.apache.oozie.util.ConfigUtils;
040import org.apache.oozie.util.JobUtils;
041import org.apache.oozie.util.XConfiguration;
042import org.apache.oozie.util.XLog;
043import org.json.simple.JSONObject;
044
045public abstract class BaseJobServlet extends JsonRestServlet {
046
047    private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
048
049    static {
050        RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
051                RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
052                RestConstants.JOB_SHOW_PARAM, String.class, false, Arrays.asList("GET")), new ParameterInfo(
053                        RestConstants.ORDER_PARAM, String.class, false, Arrays.asList("GET"))));
054    }
055
056    public BaseJobServlet(String instrumentationName) {
057        super(instrumentationName, RESOURCES_INFO);
058    }
059
060    /**
061     * Perform various job related actions - start, suspend, resume, kill, etc.
062     */
063    @Override
064    protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
065        String jobId = getResourceName(request);
066        request.setAttribute(AUDIT_PARAM, jobId);
067        request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM));
068        try {
069            AuthorizationService auth = Services.get().get(AuthorizationService.class);
070            auth.authorizeForJob(getUser(request), jobId, true);
071        }
072        catch (AuthorizationException ex) {
073            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
074        }
075
076        String action = request.getParameter(RestConstants.ACTION_PARAM);
077        if (action.equals(RestConstants.JOB_ACTION_START)) {
078            stopCron();
079            startJob(request, response);
080            startCron();
081            response.setStatus(HttpServletResponse.SC_OK);
082        }
083        else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
084            stopCron();
085            resumeJob(request, response);
086            startCron();
087            response.setStatus(HttpServletResponse.SC_OK);
088        }
089        else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
090            stopCron();
091            suspendJob(request, response);
092            startCron();
093            response.setStatus(HttpServletResponse.SC_OK);
094        }
095        else if (action.equals(RestConstants.JOB_ACTION_KILL)) {
096            stopCron();
097            killJob(request, response);
098            startCron();
099            response.setStatus(HttpServletResponse.SC_OK);
100        }
101        else if (action.equals(RestConstants.JOB_ACTION_CHANGE)) {
102            stopCron();
103            changeJob(request, response);
104            startCron();
105            response.setStatus(HttpServletResponse.SC_OK);
106        }
107        else if (action.equals(RestConstants.JOB_ACTION_RERUN)) {
108            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
109            Configuration conf = new XConfiguration(request.getInputStream());
110            stopCron();
111            String requestUser = getUser(request);
112            if (!requestUser.equals(UNDEF)) {
113                conf.set(OozieClient.USER_NAME, requestUser);
114            }
115            BaseJobServlet.checkAuthorizationForApp(conf);
116            JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
117            reRunJob(request, response, conf);
118            startCron();
119            response.setStatus(HttpServletResponse.SC_OK);
120        }
121        else if (action.equals(RestConstants.JOB_COORD_ACTION_RERUN)) {
122            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
123            stopCron();
124            JSONObject json = reRunJob(request, response, null);
125            startCron();
126            if (json != null) {
127                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
128            }
129            else {
130                response.setStatus(HttpServletResponse.SC_OK);
131            }
132        }
133        else if (action.equals(RestConstants.JOB_BUNDLE_ACTION_RERUN)) {
134            validateContentType(request, RestConstants.XML_CONTENT_TYPE);
135            stopCron();
136            JSONObject json = reRunJob(request, response, null);
137            startCron();
138            if (json != null) {
139                sendJsonResponse(response, HttpServletResponse.SC_OK, json);
140            }
141            else {
142                response.setStatus(HttpServletResponse.SC_OK);
143            }
144        }
145        else {
146            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
147                    RestConstants.ACTION_PARAM, action);
148        }
149    }
150
151    /**
152     * Validate the configuration user/group. <p/>
153     *
154     * @param conf configuration.
155     * @throws XServletException thrown if the configuration does not have a property {@link
156     * org.apache.oozie.client.OozieClient#USER_NAME}.
157     */
158    static void checkAuthorizationForApp(Configuration conf) throws XServletException {
159        String user = conf.get(OozieClient.USER_NAME);
160        String acl = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.GROUP_NAME, OozieClient.JOB_ACL, null);
161        try {
162            if (user == null) {
163                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0401, OozieClient.USER_NAME);
164            }
165            AuthorizationService auth = Services.get().get(AuthorizationService.class);
166
167            if (acl != null){
168                 conf.set(OozieClient.GROUP_NAME, acl);
169            }
170            else if (acl == null && auth.useDefaultGroupAsAcl()) {
171                acl = auth.getDefaultGroup(user);
172                conf.set(OozieClient.GROUP_NAME, acl);
173            }
174            XLog.Info.get().setParameter(XLogService.GROUP, acl);
175            String wfPath = conf.get(OozieClient.APP_PATH);
176            String coordPath = conf.get(OozieClient.COORDINATOR_APP_PATH);
177            String bundlePath = conf.get(OozieClient.BUNDLE_APP_PATH);
178
179            if (wfPath == null && coordPath == null && bundlePath == null) {
180                String[] libPaths = conf.getStrings(XOozieClient.LIBPATH);
181                if (libPaths != null && libPaths.length > 0 && libPaths[0].trim().length() > 0) {
182                    conf.set(OozieClient.APP_PATH, libPaths[0].trim());
183                    wfPath = libPaths[0].trim();
184                }
185                else {
186                    throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0405);
187                }
188            }
189            ServletUtilities.ValidateAppPath(wfPath, coordPath, bundlePath);
190
191            if (wfPath != null) {
192                auth.authorizeForApp(user, acl, wfPath, "workflow.xml", conf);
193            }
194            else if (coordPath != null){
195                auth.authorizeForApp(user, acl, coordPath, "coordinator.xml", conf);
196            }
197            else if (bundlePath != null){
198                auth.authorizeForApp(user, acl, bundlePath, "bundle.xml", conf);
199            }
200        }
201        catch (AuthorizationException ex) {
202            XLog.getLog(BaseJobServlet.class).info("AuthorizationException ", ex);
203            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
204        }
205    }
206
207    /**
208     * Return information about jobs.
209     */
210    @Override
211    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
212        String jobId = getResourceName(request);
213        String show = request.getParameter(RestConstants.JOB_SHOW_PARAM);
214        String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
215                ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
216
217        try {
218            AuthorizationService auth = Services.get().get(AuthorizationService.class);
219            auth.authorizeForJob(getUser(request), jobId, false);
220        }
221        catch (AuthorizationException ex) {
222            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex);
223        }
224
225        if (show == null || show.equals(RestConstants.JOB_SHOW_INFO)) {
226            stopCron();
227            JsonBean job = null;
228            try {
229                job = getJob(request, response);
230            }
231            catch (BaseEngineException e) {
232                // TODO Auto-generated catch block
233                // e.printStackTrace();
234
235                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e);
236            }
237            startCron();
238            sendJsonResponse(response, HttpServletResponse.SC_OK, job, timeZoneId);
239        }
240
241        else if (show.equals(RestConstants.JOB_SHOW_JMS_TOPIC)) {
242            stopCron();
243            String jmsTopicName = getJMSTopicName(request, response);
244            JSONObject json = new JSONObject();
245            json.put(JsonTags.JMS_TOPIC_NAME, jmsTopicName);
246            startCron();
247            sendJsonResponse(response, HttpServletResponse.SC_OK, json);
248        }
249
250        else if (show.equals(RestConstants.JOB_SHOW_LOG)) {
251            response.setContentType(TEXT_UTF8);
252            streamJobLog(request, response);
253        }
254        else if (show.equals(RestConstants.JOB_SHOW_DEFINITION)) {
255            stopCron();
256            response.setContentType(XML_UTF8);
257            String wfDefinition = getJobDefinition(request, response);
258            startCron();
259            response.setStatus(HttpServletResponse.SC_OK);
260            response.getWriter().write(wfDefinition);
261        }
262        else if (show.equals(RestConstants.JOB_SHOW_GRAPH)) {
263            stopCron();
264            streamJobGraph(request, response);
265            startCron(); // -- should happen before you stream anything in response?
266        }
267        else {
268            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
269                    RestConstants.JOB_SHOW_PARAM, show);
270        }
271    }
272
273    /**
274     * abstract method to start a job, either workflow or coordinator
275     *
276     * @param request
277     * @param response
278     * @throws XServletException
279     * @throws IOException TODO
280     */
281    abstract void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
282            IOException;
283
284    /**
285     * abstract method to resume a job, either workflow or coordinator
286     *
287     * @param request
288     * @param response
289     * @throws XServletException
290     * @throws IOException TODO
291     */
292    abstract void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
293            IOException;
294
295    /**
296     * abstract method to suspend a job, either workflow or coordinator
297     *
298     * @param request
299     * @param response
300     * @throws XServletException
301     * @throws IOException TODO
302     */
303    abstract void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
304            IOException;
305
306    /**
307     * abstract method to kill a job, either workflow or coordinator
308     *
309     * @param request
310     * @param response
311     * @throws XServletException
312     * @throws IOException TODO
313     */
314    abstract void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
315            IOException;
316
317    /**
318     * abstract method to change a coordinator job
319     *
320     * @param request
321     * @param response
322     * @throws XServletException
323     * @throws IOException TODO
324     */
325    abstract void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
326            IOException;
327
328    /**
329     * abstract method to re-run a job, either workflow or coordinator
330     *
331     * @param request
332     * @param response
333     * @param conf
334     * @throws XServletException
335     * @throws IOException TODO
336     */
337    abstract JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf)
338            throws XServletException, IOException;
339
340    /**
341     * abstract method to get a job, either workflow or coordinator, in JsonBean representation
342     *
343     * @param request
344     * @param response
345     * @return JsonBean representation of a job, either workflow or coordinator
346     * @throws XServletException
347     * @throws IOException TODO
348     * @throws BaseEngineException
349     */
350    abstract JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException,
351            IOException, BaseEngineException;
352
353    /**
354     * abstract method to get definition of a job, either workflow or coordinator
355     *
356     * @param request
357     * @param response
358     * @return job, either workflow or coordinator, definition in string format
359     * @throws XServletException
360     * @throws IOException TODO
361     */
362    abstract String getJobDefinition(HttpServletRequest request, HttpServletResponse response)
363            throws XServletException, IOException;
364
365    /**
366     * abstract method to get and stream log information of job, either workflow or coordinator
367     *
368     * @param request
369     * @param response
370     * @throws XServletException
371     * @throws IOException
372     */
373    abstract void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
374            IOException;
375
376    /**
377     * abstract method to create and stream image for runtime DAG -- workflow only
378     *
379     * @param request
380     * @param response
381     * @throws XServletException
382     * @throws IOException
383     */
384    abstract void streamJobGraph(HttpServletRequest request, HttpServletResponse response)
385            throws XServletException, IOException;
386
387    /**
388     * abstract method to get JMS topic name for a job
389     * @param request
390     * @param response
391     * @throws XServletException
392     * @throws IOException
393     */
394    abstract String getJMSTopicName(HttpServletRequest request, HttpServletResponse response)
395            throws XServletException, IOException;
396}