001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.servlet; 019 020import java.io.IOException; 021import java.util.List; 022import javax.servlet.ServletInputStream; 023import javax.servlet.http.HttpServletRequest; 024import javax.servlet.http.HttpServletResponse; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.oozie.*; 027import org.apache.oozie.client.WorkflowAction; 028import org.apache.oozie.client.WorkflowJob; 029import org.apache.oozie.client.rest.*; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.coord.CoordRerunXCommand; 032import org.apache.oozie.service.BundleEngineService; 033import org.apache.oozie.service.CoordinatorEngineService; 034import org.apache.oozie.service.DagEngineService; 035import org.apache.oozie.service.Services; 036import org.apache.oozie.util.GraphGenerator; 037import org.apache.oozie.util.XLog; 038import org.json.simple.JSONObject; 039 040 041@SuppressWarnings("serial") 042public class V1JobServlet extends BaseJobServlet { 043 044 private static final String INSTRUMENTATION_NAME = "v1job"; 045 public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length"; 046 047 public V1JobServlet() { 048 super(INSTRUMENTATION_NAME); 049 } 050 051 protected V1JobServlet(String instrumentation_name){ 052 super(instrumentation_name); 053 } 054 055 /* 056 * protected method to start a job 057 */ 058 @Override 059 protected void startJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 060 IOException { 061 /* 062 * Configuration conf = new XConfiguration(request.getInputStream()); 063 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 064 * conf.get(OozieClient.COORDINATOR_APP_PATH); 065 * 066 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 067 */ 068 String jobId = getResourceName(request); 069 if (jobId.endsWith("-W")) { 070 startWorkflowJob(request, response); 071 } 072 else if (jobId.endsWith("-B")) { 073 startBundleJob(request, response); 074 } 075 else { 076 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, RestConstants.JOB_ACTION_START); 077 } 078 079 } 080 081 /* 082 * protected method to resume a job 083 */ 084 @Override 085 protected void resumeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 086 IOException { 087 /* 088 * Configuration conf = new XConfiguration(request.getInputStream()); 089 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 090 * conf.get(OozieClient.COORDINATOR_APP_PATH); 091 * 092 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 093 */ 094 String jobId = getResourceName(request); 095 if (jobId.endsWith("-W")) { 096 resumeWorkflowJob(request, response); 097 } 098 else if (jobId.endsWith("-B")) { 099 resumeBundleJob(request, response); 100 } 101 else { 102 resumeCoordinatorJob(request, response); 103 } 104 } 105 106 /* 107 * protected method to suspend a job 108 */ 109 @Override 110 protected void suspendJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 111 IOException { 112 /* 113 * Configuration conf = new XConfiguration(request.getInputStream()); 114 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 115 * conf.get(OozieClient.COORDINATOR_APP_PATH); 116 * 117 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 118 */ 119 String jobId = getResourceName(request); 120 if (jobId.endsWith("-W")) { 121 suspendWorkflowJob(request, response); 122 } 123 else if (jobId.endsWith("-B")) { 124 suspendBundleJob(request, response); 125 } 126 else { 127 suspendCoordinatorJob(request, response); 128 } 129 } 130 131 /* 132 * protected method to kill a job 133 */ 134 @Override 135 protected void killJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 136 IOException { 137 /* 138 * Configuration conf = new XConfiguration(request.getInputStream()); 139 * String wfPath = conf.get(OozieClient.APP_PATH); String coordPath = 140 * conf.get(OozieClient.COORDINATOR_APP_PATH); 141 * 142 * ServletUtilities.ValidateAppPath(wfPath, coordPath); 143 */ 144 String jobId = getResourceName(request); 145 if (jobId.endsWith("-W")) { 146 killWorkflowJob(request, response); 147 } 148 else if (jobId.endsWith("-B")) { 149 killBundleJob(request, response); 150 } 151 else { 152 killCoordinatorJob(request, response); 153 } 154 } 155 156 /** 157 * protected method to change a coordinator job 158 * @param request request object 159 * @param response response object 160 * @throws XServletException 161 * @throws IOException 162 */ 163 @Override 164 protected void changeJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 165 IOException { 166 String jobId = getResourceName(request); 167 if (jobId.endsWith("-B")) { 168 changeBundleJob(request, response); 169 } 170 else { 171 changeCoordinatorJob(request, response); 172 } 173 } 174 175 /* 176 * protected method to reRun a job 177 * 178 * @seeorg.apache.oozie.servlet.BaseJobServlet#reRunJob(javax.servlet.http. 179 * HttpServletRequest, javax.servlet.http.HttpServletResponse, 180 * org.apache.hadoop.conf.Configuration) 181 */ 182 @Override 183 protected JSONObject reRunJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 184 throws XServletException, IOException { 185 JSONObject json = null; 186 String jobId = getResourceName(request); 187 if (jobId.endsWith("-W")) { 188 reRunWorkflowJob(request, response, conf); 189 } 190 else if (jobId.endsWith("-B")) { 191 rerunBundleJob(request, response, conf); 192 } 193 else { 194 json = reRunCoordinatorActions(request, response, conf); 195 } 196 return json; 197 } 198 199 /* 200 * protected method to get a job in JsonBean representation 201 */ 202 @Override 203 protected JsonBean getJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 204 IOException, BaseEngineException { 205 ServletInputStream is = request.getInputStream(); 206 byte[] b = new byte[101]; 207 while (is.readLine(b, 0, 100) != -1) { 208 XLog.getLog(getClass()).warn("Printing :" + new String(b)); 209 } 210 211 JsonBean jobBean = null; 212 String jobId = getResourceName(request); 213 if (jobId.endsWith("-B")) { 214 jobBean = getBundleJob(request, response); 215 } 216 else { 217 if (jobId.endsWith("-W")) { 218 jobBean = getWorkflowJob(request, response); 219 } 220 else { 221 if (jobId.contains("-W@")) { 222 jobBean = getWorkflowAction(request, response); 223 } 224 else { 225 if (jobId.contains("-C@")) { 226 jobBean = getCoordinatorAction(request, response); 227 } 228 else { 229 jobBean = getCoordinatorJob(request, response); 230 } 231 } 232 } 233 } 234 235 return jobBean; 236 } 237 238 /* 239 * protected method to get a job definition in String format 240 */ 241 @Override 242 protected String getJobDefinition(HttpServletRequest request, HttpServletResponse response) 243 throws XServletException, IOException { 244 String jobDefinition = null; 245 String jobId = getResourceName(request); 246 if (jobId.endsWith("-W")) { 247 jobDefinition = getWorkflowJobDefinition(request, response); 248 } 249 else if (jobId.endsWith("-B")) { 250 jobDefinition = getBundleJobDefinition(request, response); 251 } 252 else { 253 jobDefinition = getCoordinatorJobDefinition(request, response); 254 } 255 return jobDefinition; 256 } 257 258 /* 259 * protected method to stream a job log into response object 260 */ 261 @Override 262 protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, 263 IOException { 264 String jobId = getResourceName(request); 265 if (jobId.endsWith("-W")) { 266 streamWorkflowJobLog(request, response); 267 } 268 else if (jobId.endsWith("-B")) { 269 streamBundleJob(request, response); 270 } 271 else { 272 streamCoordinatorJobLog(request, response); 273 } 274 } 275 276 @Override 277 protected void streamJobGraph(HttpServletRequest request, HttpServletResponse response) 278 throws XServletException, IOException { 279 String jobId = getResourceName(request); 280 if (jobId.endsWith("-W")) { 281 // Applicable only to worflow, for now 282 response.setContentType(RestConstants.PNG_IMAGE_CONTENT_TYPE); 283 try { 284 String showKill = request.getParameter(RestConstants.JOB_SHOW_KILL_PARAM); 285 boolean sK = showKill != null && (showKill.equalsIgnoreCase("yes") || showKill.equals("1") || showKill.equalsIgnoreCase("true")); 286 287 new GraphGenerator( 288 getWorkflowJobDefinition(request, response), 289 (JsonWorkflowJob)getWorkflowJob(request, response), 290 sK).write(response.getOutputStream()); 291 } 292 catch (Exception e) { 293 throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, ErrorCode.E0307, e.getMessage(), e); 294 } 295 } 296 else { 297 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0306); 298 } 299 } 300 301 /** 302 * Start wf job 303 * 304 * @param request servlet request 305 * @param response servlet response 306 * @throws XServletException 307 */ 308 private void startWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 309 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 310 311 String jobId = getResourceName(request); 312 try { 313 dagEngine.start(jobId); 314 } 315 catch (DagEngineException ex) { 316 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 317 } 318 } 319 320 /** 321 * Start bundle job 322 * 323 * @param request servlet request 324 * @param response servlet response 325 * @throws XServletException 326 */ 327 private void startBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 328 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 329 String jobId = getResourceName(request); 330 try { 331 bundleEngine.start(jobId); 332 } 333 catch (BundleEngineException ex) { 334 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 335 } 336 } 337 338 /** 339 * Resume workflow job 340 * 341 * @param request servlet request 342 * @param response servlet response 343 * @throws XServletException 344 */ 345 private void resumeWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 346 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 347 348 String jobId = getResourceName(request); 349 try { 350 dagEngine.resume(jobId); 351 } 352 catch (DagEngineException ex) { 353 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 354 } 355 } 356 357 /** 358 * Resume bundle job 359 * 360 * @param request servlet request 361 * @param response servlet response 362 * @throws XServletException 363 */ 364 private void resumeBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 365 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 366 String jobId = getResourceName(request); 367 try { 368 bundleEngine.resume(jobId); 369 } 370 catch (BundleEngineException ex) { 371 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 372 } 373 } 374 375 /** 376 * Resume coordinator job 377 * 378 * @param request servlet request 379 * @param response servlet response 380 * @throws XServletException 381 * @throws CoordinatorEngineException 382 */ 383 private void resumeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 384 throws XServletException { 385 String jobId = getResourceName(request); 386 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 387 getUser(request)); 388 try { 389 coordEngine.resume(jobId); 390 } 391 catch (CoordinatorEngineException ex) { 392 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 393 } 394 } 395 396 /** 397 * Suspend a wf job 398 * 399 * @param request servlet request 400 * @param response servlet response 401 * @throws XServletException 402 */ 403 private void suspendWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 404 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 405 406 String jobId = getResourceName(request); 407 try { 408 dagEngine.suspend(jobId); 409 } 410 catch (DagEngineException ex) { 411 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 412 } 413 } 414 415 /** 416 * Suspend bundle job 417 * 418 * @param request servlet request 419 * @param response servlet response 420 * @throws XServletException 421 */ 422 private void suspendBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 423 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 424 String jobId = getResourceName(request); 425 try { 426 bundleEngine.suspend(jobId); 427 } 428 catch (BundleEngineException ex) { 429 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 430 } 431 } 432 433 /** 434 * Suspend coordinator job 435 * 436 * @param request servlet request 437 * @param response servlet response 438 * @throws XServletException 439 */ 440 private void suspendCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 441 throws XServletException { 442 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 443 getUser(request)); 444 String jobId = getResourceName(request); 445 try { 446 coordEngine.suspend(jobId); 447 } 448 catch (CoordinatorEngineException ex) { 449 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 450 } 451 } 452 453 /** 454 * Kill a wf job 455 * @param request servlet request 456 * @param response servlet response 457 * @throws XServletException 458 */ 459 private void killWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 460 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 461 462 String jobId = getResourceName(request); 463 try { 464 dagEngine.kill(jobId); 465 } 466 catch (DagEngineException ex) { 467 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 468 } 469 } 470 471 /** 472 * Kill a coord job 473 * @param request servlet request 474 * @param response servlet response 475 * @throws XServletException 476 */ 477 private void killCoordinatorJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 478 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 479 getUser(request)); 480 String jobId = getResourceName(request); 481 try { 482 coordEngine.kill(jobId); 483 } 484 catch (CoordinatorEngineException ex) { 485 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 486 } 487 } 488 489 /** 490 * Kill bundle job 491 * 492 * @param request servlet request 493 * @param response servlet response 494 * @throws XServletException 495 */ 496 private void killBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 497 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 498 String jobId = getResourceName(request); 499 try { 500 bundleEngine.kill(jobId); 501 } 502 catch (BundleEngineException ex) { 503 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 504 } 505 } 506 507 /** 508 * Change a coordinator job 509 * 510 * @param request servlet request 511 * @param response servlet response 512 * @throws XServletException 513 */ 514 private void changeCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 515 throws XServletException { 516 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 517 getUser(request)); 518 String jobId = getResourceName(request); 519 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 520 try { 521 coordEngine.change(jobId, changeValue); 522 } 523 catch (CoordinatorEngineException ex) { 524 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 525 } 526 } 527 528 /** 529 * Change a bundle job 530 * 531 * @param request servlet request 532 * @param response servlet response 533 * @throws XServletException 534 */ 535 private void changeBundleJob(HttpServletRequest request, HttpServletResponse response) 536 throws XServletException { 537 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 538 String jobId = getResourceName(request); 539 String changeValue = request.getParameter(RestConstants.JOB_CHANGE_VALUE); 540 try { 541 bundleEngine.change(jobId, changeValue); 542 } 543 catch (BundleEngineException ex) { 544 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 545 } 546 } 547 548 /** 549 * Rerun a wf job 550 * 551 * @param request servlet request 552 * @param response servlet response 553 * @param conf configuration object 554 * @throws XServletException 555 */ 556 private void reRunWorkflowJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 557 throws XServletException { 558 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 559 560 String jobId = getResourceName(request); 561 try { 562 dagEngine.reRun(jobId, conf); 563 } 564 catch (DagEngineException ex) { 565 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 566 } 567 } 568 569 /** 570 * Rerun bundle job 571 * 572 * @param request servlet request 573 * @param response servlet response 574 * @param conf configration object 575 * @throws XServletException 576 */ 577 private void rerunBundleJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) 578 throws XServletException { 579 JSONObject json = new JSONObject(); 580 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 581 String jobId = getResourceName(request); 582 583 String coordScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM); 584 String dateScope = request.getParameter(RestConstants.JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM); 585 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 586 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 587 588 XLog.getLog(getClass()).info( 589 "Rerun Bundle for jobId=" + jobId + ", coordScope=" + coordScope + ", dateScope=" + dateScope + ", refresh=" 590 + refresh + ", noCleanup=" + noCleanup); 591 592 try { 593 bundleEngine.reRun(jobId, coordScope, dateScope, Boolean.valueOf(refresh), Boolean.valueOf(noCleanup)); 594 } 595 catch (BaseEngineException ex) { 596 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 597 } 598 } 599 600 /** 601 * Rerun coordinator actions 602 * 603 * @param request servlet request 604 * @param response servlet response 605 * @param conf configuration object 606 * @throws XServletException 607 */ 608 @SuppressWarnings("unchecked") 609 private JSONObject reRunCoordinatorActions(HttpServletRequest request, HttpServletResponse response, 610 Configuration conf) throws XServletException { 611 JSONObject json = new JSONObject(); 612 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request)); 613 614 String jobId = getResourceName(request); 615 616 String rerunType = request.getParameter(RestConstants.JOB_COORD_RERUN_TYPE_PARAM); 617 String scope = request.getParameter(RestConstants.JOB_COORD_RERUN_SCOPE_PARAM); 618 String refresh = request.getParameter(RestConstants.JOB_COORD_RERUN_REFRESH_PARAM); 619 String noCleanup = request.getParameter(RestConstants.JOB_COORD_RERUN_NOCLEANUP_PARAM); 620 621 XLog.getLog(getClass()).info( 622 "Rerun coordinator for jobId=" + jobId + ", rerunType=" + rerunType + ",scope=" + scope + ",refresh=" 623 + refresh + ", noCleanup=" + noCleanup); 624 625 try { 626 if (!(rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE) || rerunType 627 .equals(RestConstants.JOB_COORD_RERUN_ACTION))) { 628 throw new CommandException(ErrorCode.E1018, "date or action expected."); 629 } 630 CoordinatorActionInfo coordInfo = coordEngine.reRun(jobId, rerunType, scope, Boolean.valueOf(refresh), 631 Boolean.valueOf(noCleanup)); 632 List<CoordinatorActionBean> coordActions; 633 if (coordInfo != null) { 634 coordActions = coordInfo.getCoordActions(); 635 } 636 else { 637 coordActions = CoordRerunXCommand.getCoordActions(rerunType, jobId, scope); 638 } 639 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(coordActions, "GMT")); 640 } 641 catch (BaseEngineException ex) { 642 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 643 } 644 catch (CommandException ex) { 645 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 646 } 647 648 return json; 649 } 650 651 652 653 /** 654 * Get workflow job 655 * 656 * @param request servlet request 657 * @param response servlet response 658 * @return JsonBean WorkflowJobBean 659 * @throws XServletException 660 */ 661 protected JsonBean getWorkflowJob(HttpServletRequest request, HttpServletResponse response) throws XServletException { 662 JsonBean jobBean = getWorkflowJobBean(request, response); 663 // for backward compatibility (OOZIE-1231) 664 swapMRActionID((WorkflowJob)jobBean); 665 return jobBean; 666 } 667 668 /** 669 * Get workflow job 670 * 671 * @param request servlet request 672 * @param response servlet response 673 * @return JsonBean WorkflowJobBean 674 * @throws XServletException 675 */ 676 protected JsonBean getWorkflowJobBean(HttpServletRequest request, HttpServletResponse response) throws XServletException { 677 JsonBean jobBean = null; 678 String jobId = getResourceName(request); 679 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 680 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 681 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 682 start = (start < 1) ? 1 : start; 683 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 684 len = (len < 1) ? Integer.MAX_VALUE : len; 685 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 686 try { 687 jobBean = (JsonBean) dagEngine.getJob(jobId, start, len); 688 } 689 catch (DagEngineException ex) { 690 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 691 } 692 return jobBean; 693 } 694 695 private void swapMRActionID(WorkflowJob wjBean) { 696 List<WorkflowAction> actions = wjBean.getActions(); 697 if (actions != null) { 698 for (WorkflowAction wa : actions) { 699 swapMRActionID(wa); 700 } 701 } 702 } 703 704 private void swapMRActionID(WorkflowAction waBean) { 705 if (waBean.getType().equals("map-reduce")) { 706 String childId = waBean.getExternalChildIDs(); 707 if (childId != null && !childId.equals("")) { 708 String consoleBase = getConsoleBase(waBean.getConsoleUrl()); 709 ((WorkflowActionBean) waBean).setConsoleUrl(consoleBase + childId); 710 ((WorkflowActionBean) waBean).setExternalId(childId); 711 ((WorkflowActionBean) waBean).setExternalChildIDs(""); 712 } 713 } 714 } 715 716 private String getConsoleBase(String url) { 717 String consoleBase = null; 718 if (url.indexOf("application") != -1) { 719 consoleBase = url.split("application_[0-9]+_[0-9]+")[0]; 720 } 721 else { 722 consoleBase = url.split("job_[0-9]+_[0-9]+")[0]; 723 } 724 return consoleBase; 725 } 726 727 /** 728 * Get wf action info 729 * 730 * @param request servlet request 731 * @param response servlet response 732 * @return JsonBean WorkflowActionBean 733 * @throws XServletException 734 */ 735 protected JsonBean getWorkflowAction(HttpServletRequest request, HttpServletResponse response) 736 throws XServletException { 737 738 JsonBean actionBean = getWorkflowActionBean(request, response); 739 // for backward compatibility (OOZIE-1231) 740 swapMRActionID((WorkflowAction)actionBean); 741 return actionBean; 742 } 743 744 protected JsonBean getWorkflowActionBean(HttpServletRequest request, HttpServletResponse response) 745 throws XServletException { 746 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 747 748 JsonBean actionBean = null; 749 String actionId = getResourceName(request); 750 try { 751 actionBean = dagEngine.getWorkflowAction(actionId); 752 } 753 catch (BaseEngineException ex) { 754 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 755 } 756 return actionBean; 757 } 758 759 /** 760 * Get coord job info 761 * 762 * @param request servlet request 763 * @param response servlet response 764 * @return JsonBean CoordinatorJobBean 765 * @throws XServletException 766 * @throws BaseEngineException 767 */ 768 protected JsonBean getCoordinatorJob(HttpServletRequest request, HttpServletResponse response) 769 throws XServletException, BaseEngineException { 770 JsonBean jobBean = null; 771 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 772 getUser(request)); 773 String jobId = getResourceName(request); 774 String startStr = request.getParameter(RestConstants.OFFSET_PARAM); 775 String lenStr = request.getParameter(RestConstants.LEN_PARAM); 776 String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM); 777 String orderStr = request.getParameter(RestConstants.ORDER_PARAM); 778 boolean order = (orderStr != null && orderStr.equals("desc")) ? true : false; 779 int start = (startStr != null) ? Integer.parseInt(startStr) : 1; 780 start = (start < 1) ? 1 : start; 781 // Get default number of coordinator actions to be retrieved 782 int defaultLen = Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000); 783 int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0; 784 len = getCoordinatorJobLength(defaultLen, len); 785 try { 786 JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId, filter, start, len, order); 787 jobBean = coordJob; 788 } 789 catch (CoordinatorEngineException ex) { 790 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 791 } 792 793 return jobBean; 794 } 795 796 /** 797 * Given the requested length and the default length, determine how many coordinator jobs to return. 798 * Used by {@link #getCoordinatorJob(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)} 799 * 800 * @param defaultLen The default length 801 * @param len The requested length 802 * @return The length to use 803 */ 804 protected int getCoordinatorJobLength(int defaultLen, int len) { 805 return (len < 1) ? defaultLen : len; 806 } 807 808 /** 809 * Get bundle job info 810 * 811 * @param request servlet request 812 * @param response servlet response 813 * @return JsonBean bundle job bean 814 * @throws XServletException 815 * @throws BaseEngineException 816 */ 817 private JsonBean getBundleJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, 818 BaseEngineException { 819 JsonBean jobBean = null; 820 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 821 String jobId = getResourceName(request); 822 823 try { 824 jobBean = (JsonBean) bundleEngine.getBundleJob(jobId); 825 826 return jobBean; 827 } 828 catch (BundleEngineException ex) { 829 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 830 } 831 } 832 833 /** 834 * Get coordinator action 835 * 836 * @param request servlet request 837 * @param response servlet response 838 * @return JsonBean CoordinatorActionBean 839 * @throws XServletException 840 * @throws BaseEngineException 841 */ 842 private JsonBean getCoordinatorAction(HttpServletRequest request, HttpServletResponse response) 843 throws XServletException, BaseEngineException { 844 JsonBean actionBean = null; 845 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 846 getUser(request)); 847 String actionId = getResourceName(request); 848 try { 849 actionBean = coordEngine.getCoordAction(actionId); 850 } 851 catch (CoordinatorEngineException ex) { 852 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 853 } 854 855 return actionBean; 856 } 857 858 /** 859 * Get wf job definition 860 * 861 * @param request servlet request 862 * @param response servlet response 863 * @return String wf definition 864 * @throws XServletException 865 */ 866 private String getWorkflowJobDefinition(HttpServletRequest request, HttpServletResponse response) 867 throws XServletException { 868 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 869 870 String wfDefinition; 871 String jobId = getResourceName(request); 872 try { 873 wfDefinition = dagEngine.getDefinition(jobId); 874 } 875 catch (DagEngineException ex) { 876 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 877 } 878 return wfDefinition; 879 } 880 881 /** 882 * Get bundle job definition 883 * 884 * @param request servlet request 885 * @param response servlet response 886 * @return String bundle definition 887 * @throws XServletException 888 */ 889 private String getBundleJobDefinition(HttpServletRequest request, HttpServletResponse response) throws XServletException { 890 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 891 String bundleDefinition; 892 String jobId = getResourceName(request); 893 try { 894 bundleDefinition = bundleEngine.getDefinition(jobId); 895 } 896 catch (BundleEngineException ex) { 897 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 898 } 899 return bundleDefinition; 900 } 901 902 /** 903 * Get coordinator job definition 904 * 905 * @param request servlet request 906 * @param response servlet response 907 * @return String coord definition 908 * @throws XServletException 909 */ 910 private String getCoordinatorJobDefinition(HttpServletRequest request, HttpServletResponse response) 911 throws XServletException { 912 913 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 914 getUser(request)); 915 916 String jobId = getResourceName(request); 917 918 String coordDefinition = null; 919 try { 920 coordDefinition = coordEngine.getDefinition(jobId); 921 } 922 catch (BaseEngineException ex) { 923 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 924 } 925 return coordDefinition; 926 } 927 928 /** 929 * Stream wf job log 930 * 931 * @param request servlet request 932 * @param response servlet response 933 * @throws XServletException 934 * @throws IOException 935 */ 936 private void streamWorkflowJobLog(HttpServletRequest request, HttpServletResponse response) 937 throws XServletException, IOException { 938 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); 939 String jobId = getResourceName(request); 940 try { 941 dagEngine.streamLog(jobId, response.getWriter()); 942 } 943 catch (DagEngineException ex) { 944 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 945 } 946 } 947 948 /** 949 * Stream bundle job log 950 * 951 * @param request servlet request 952 * @param response servlet response 953 * @throws XServletException 954 */ 955 private void streamBundleJob(HttpServletRequest request, HttpServletResponse response) 956 throws XServletException, IOException { 957 BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); 958 String jobId = getResourceName(request); 959 try { 960 bundleEngine.streamLog(jobId, response.getWriter()); 961 } 962 catch (BundleEngineException ex) { 963 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 964 } 965 } 966 967 /** 968 * Stream coordinator job log 969 * 970 * @param request servlet request 971 * @param response servlet response 972 * @throws XServletException 973 * @throws IOException 974 */ 975 private void streamCoordinatorJobLog(HttpServletRequest request, HttpServletResponse response) 976 throws XServletException, IOException { 977 978 CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine( 979 getUser(request)); 980 String jobId = getResourceName(request); 981 String logRetrievalScope = request.getParameter(RestConstants.JOB_LOG_SCOPE_PARAM); 982 String logRetrievalType = request.getParameter(RestConstants.JOB_LOG_TYPE_PARAM); 983 try { 984 coordEngine.streamLog(jobId, logRetrievalScope, logRetrievalType, response.getWriter()); 985 } 986 catch (BaseEngineException ex) { 987 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 988 } 989 catch (CommandException ex) { 990 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); 991 } 992 } 993 994 @Override 995 protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, 996 IOException { 997 throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); 998 } 999 1000}