001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.TreeSet; 027import java.util.Comparator; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.oozie.BundleActionBean; 031import org.apache.oozie.BundleJobBean; 032import org.apache.oozie.CoordinatorJobBean; 033import org.apache.oozie.ErrorCode; 034import org.apache.oozie.client.CoordinatorAction; 035import org.apache.oozie.client.Job; 036import org.apache.oozie.command.CommandException; 037import org.apache.oozie.command.bundle.BundleKillXCommand; 038import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 039import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor; 040import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor; 041import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 042import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 043import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor; 044import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor; 045import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor; 046import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 047import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor; 048import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 049import org.apache.oozie.executor.jpa.CoordJobsGetChangedJPAExecutor; 050import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor; 051import org.apache.oozie.executor.jpa.JPAExecutorException; 052import org.apache.oozie.util.DateUtils; 053import org.apache.oozie.util.MemoryLocks; 054import org.apache.oozie.util.StatusUtils; 055import org.apache.oozie.util.XLog; 056 057/** 058 * StateTransitService is scheduled to run at the configured interval. 059 * <p/> 060 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job 061 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 062 * SUCCEEDED. 063 */ 064public class StatusTransitService implements Service { 065 public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService."; 066 public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval"; 067 public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status"; 068 public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error"; 069 private static int limit = -1; 070 private static Date lastInstanceStartTime = null; 071 private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class); 072 073 /** 074 * StateTransitRunnable is the runnable which is scheduled to run at the configured interval. 075 * <p/> 076 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 077 * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to 078 * SUCCEEDED. 079 */ 080 public static class StatusTransitRunnable implements Runnable { 081 private JPAService jpaService = null; 082 private MemoryLocks.LockToken lock; 083 084 public StatusTransitRunnable() { 085 jpaService = Services.get().get(JPAService.class); 086 if (jpaService == null) { 087 LOG.error("Missing JPAService"); 088 } 089 } 090 091 @Override 092 public void run() { 093 try { 094 Date curDate = new Date(); // records the start time of this service run; 095 096 // first check if there is some other instance running; 097 lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(), 098 lockTimeout); 099 if (lock == null) { 100 LOG.info("This StatusTransitService instance" 101 + " will not run since there is already an instance running"); 102 } 103 else { 104 LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName()); 105 // running coord jobs transit service 106 coordTransit(); 107 // running bundle jobs transit service 108 bundleTransit(); 109 110 lastInstanceStartTime = curDate; 111 } 112 } 113 catch (Exception ex) { 114 LOG.warn("Exception happened during StatusTransitRunnable ", ex); 115 } 116 finally { 117 // release lock; 118 if (lock != null) { 119 lock.release(); 120 LOG.info("Released lock for [{0}]", StatusTransitService.class.getName()); 121 } 122 } 123 } 124 125 public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) { 126 Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() { 127 @Override 128 public int compare(BundleJobBean b1, BundleJobBean b2) { 129 if (b1.getId().equals(b2.getId())) { 130 return 0; 131 } 132 else 133 return 1; 134 } 135 }); 136 s.addAll(pendingJobList); 137 return new ArrayList<BundleJobBean>(s); 138 } 139 140 /** 141 * Aggregate bundle actions' status to bundle jobs 142 * 143 * @throws JPAExecutorException thrown if failed in db updates or retrievals 144 * @throws CommandException thrown if failed to run commands 145 */ 146 private void bundleTransit() throws JPAExecutorException, CommandException { 147 List<BundleJobBean> pendingJobCheckList = null; 148 149 if (lastInstanceStartTime == null) { 150 LOG.info("Running bundle status service first instance"); 151 // this is the first instance, we need to check for all pending or running jobs; 152 pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit)); 153 } 154 else { 155 LOG.info("Running bundle status service from last instance time = " 156 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 157 // this is not the first instance, we should only check jobs that have actions been 158 // updated >= start time of last service run; 159 List<BundleActionBean> actionList = jpaService 160 .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 161 Set<String> bundleIds = new HashSet<String>(); 162 for (BundleActionBean action : actionList) { 163 bundleIds.add(action.getBundleId()); 164 } 165 pendingJobCheckList = new ArrayList<BundleJobBean>(); 166 for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) { 167 BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); 168 // Running bundle job might have pending false 169 if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING) 170 || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR) 171 || bundle.getStatus().equals(Job.Status.PAUSED) 172 || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) { 173 pendingJobCheckList.add(bundle); 174 } 175 } 176 } 177 aggregateBundleJobsStatus(pendingJobCheckList); 178 } 179 180 private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException, 181 CommandException { 182 if (bundleLists != null) { 183 for (BundleJobBean bundleJob : bundleLists) { 184 try { 185 String jobId = bundleJob.getId(); 186 Job.Status[] bundleStatus = new Job.Status[1]; 187 bundleStatus[0] = bundleJob.getStatus(); 188 List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor( 189 jobId)); 190 HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>(); 191 boolean foundPending = false; 192 for (BundleActionBean bAction : bundleActions) { 193 int counter = 0; 194 if (bundleActionStatus.containsKey(bAction.getStatus())) { 195 counter = bundleActionStatus.get(bAction.getStatus()) + 1; 196 } 197 else { 198 ++counter; 199 } 200 bundleActionStatus.put(bAction.getStatus(), counter); 201 if (bAction.getCoordId() == null 202 && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) { 203 (new BundleKillXCommand(jobId)).call(); 204 LOG.info("Bundle job ["+ jobId 205 + "] has been killed since one of its coordinator job failed submission."); 206 } 207 208 if (bAction.isPending()) { 209 foundPending = true; 210 } 211 } 212 213 if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) { 214 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 215 + "' from '" + bundleJob.getStatus() + "'"); 216 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 217 } 218 else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) { 219 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 220 + "' from '" + bundleJob.getStatus() + "'"); 221 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 222 } 223 else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) { 224 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 225 + "' from '" + bundleJob.getStatus() + "'"); 226 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 227 } 228 else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) { 229 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 230 + "' from '" + bundleJob.getStatus() + "'"); 231 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 232 } 233 else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) { 234 LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString() 235 + "' from '" + bundleJob.getStatus() + "'"); 236 updateBundleJob(foundPending, bundleJob, bundleStatus[0]); 237 } 238 } 239 catch (Exception ex) { 240 LOG.error("Exception happened during aggregate bundle job's status, job = " 241 + bundleJob.getId(), ex); 242 } 243 } 244 } 245 246 } 247 248 private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException, 249 CommandException { 250 if (CoordList != null) { 251 Configuration conf = Services.get().getConf(); 252 boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false); 253 254 for (CoordinatorJobBean coordJob : CoordList) { 255 try { 256 // if namespace 0.1 is used and backward support is true, then ignore this coord job 257 if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null 258 && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 259 continue; 260 } 261 String jobId = coordJob.getId(); 262 Job.Status[] coordStatus = new Job.Status[1]; 263 coordStatus[0] = coordJob.getStatus(); 264 //Get count of Coordinator actions with pending true 265 boolean isPending = false; 266 int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId)); 267 if (count > 0) { 268 isPending = true; 269 } 270 // Get status of Coordinator actions 271 List<CoordinatorAction.Status> coordActionStatusList = jpaService 272 .execute(new CoordJobGetActionsStatusJPAExecutor(jobId)); 273 HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>(); 274 275 for (CoordinatorAction.Status status : coordActionStatusList) { 276 int counter = 0; 277 if (coordActionStatus.containsKey(status)) { 278 counter = coordActionStatus.get(status) + 1; 279 } 280 else { 281 ++counter; 282 } 283 coordActionStatus.put(status, counter); 284 } 285 286 int nonPendingCoordActionsCount = coordActionStatusList.size(); 287 boolean isDoneMaterialization = coordJob.isDoneMaterialization(); 288 if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED) 289 && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount, 290 coordStatus, isDoneMaterialization)) { 291 LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString() 292 + "' from '" + coordJob.getStatus() + "'"); 293 updateCoordJob(isPending, coordJob, coordStatus[0]); 294 } 295 else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 296 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 297 + "' from '" + coordJob.getStatus() + "'"); 298 updateCoordJob(isPending, coordJob, coordStatus[0]); 299 } 300 else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) { 301 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 302 + "' from '" + coordJob.getStatus() + "'"); 303 updateCoordJob(isPending, coordJob, coordStatus[0]); 304 } 305 else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) { 306 LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString() 307 + "' from '" + coordJob.getStatus() + "'"); 308 updateCoordJob(isPending, coordJob, coordStatus[0]); 309 } 310 else { 311 checkCoordPending(isPending, coordJob, true); 312 } 313 } 314 catch (Exception ex) { 315 LOG.error("Exception happened during aggregate coordinator job's status, job = " 316 + coordJob.getId(), ex); 317 } 318 } 319 320 } 321 } 322 323 private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus, 324 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 325 boolean ret = false; 326 int totalValuesSucceed = 0; 327 if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) { 328 totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED); 329 } 330 int totalValuesFailed = 0; 331 if (bundleActionStatus.containsKey(Job.Status.FAILED)) { 332 totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED); 333 } 334 int totalValuesKilled = 0; 335 if (bundleActionStatus.containsKey(Job.Status.KILLED)) { 336 totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED); 337 } 338 339 int totalValuesDoneWithError = 0; 340 if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) { 341 totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR); 342 } 343 344 if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) { 345 // If all the bundle actions are succeeded then bundle job should be succeeded. 346 if (bundleActions.size() == totalValuesSucceed) { 347 bundleStatus[0] = Job.Status.SUCCEEDED; 348 ret = true; 349 } 350 else if (bundleActions.size() == totalValuesKilled) { 351 // If all the bundle actions are KILLED then bundle job should be KILLED. 352 bundleStatus[0] = Job.Status.KILLED; 353 ret = true; 354 } 355 else if (bundleActions.size() == totalValuesFailed) { 356 // If all the bundle actions are FAILED then bundle job should be FAILED. 357 bundleStatus[0] = Job.Status.FAILED; 358 ret = true; 359 } 360 else { 361 bundleStatus[0] = Job.Status.DONEWITHERROR; 362 ret = true; 363 } 364 } 365 return ret; 366 } 367 368 private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 369 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) { 370 boolean ret = false; 371 int totalValuesSucceed = 0; 372 if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) { 373 totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED); 374 } 375 int totalValuesFailed = 0; 376 if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) { 377 totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED); 378 } 379 int totalValuesKilled = 0; 380 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) { 381 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED); 382 } 383 384 int totalValuesTimeOut = 0; 385 if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 386 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT); 387 } 388 389 if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) { 390 // If all the coordinator actions are succeeded then coordinator job should be succeeded. 391 if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) { 392 coordStatus[0] = Job.Status.SUCCEEDED; 393 ret = true; 394 } 395 else if (coordActionsCount == totalValuesKilled) { 396 // If all the coordinator actions are KILLED then coordinator job should be KILLED. 397 coordStatus[0] = Job.Status.KILLED; 398 ret = true; 399 } 400 else if (coordActionsCount == totalValuesFailed) { 401 // If all the coordinator actions are FAILED then coordinator job should be FAILED. 402 coordStatus[0] = Job.Status.FAILED; 403 ret = true; 404 } 405 else { 406 coordStatus[0] = Job.Status.DONEWITHERROR; 407 ret = true; 408 } 409 } 410 return ret; 411 } 412 413 private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus, 414 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 415 boolean ret = false; 416 if (bundleActionStatus.containsKey(Job.Status.PREP)) { 417 // If all the bundle actions are PREP then bundle job should be RUNNING. 418 if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) { 419 bundleStatus[0] = Job.Status.RUNNING; 420 ret = true; 421 } 422 } 423 return ret; 424 } 425 426 private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus, 427 List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) { 428 boolean ret = false; 429 430 // TODO - When bottom up cmds are allowed to change the status of parent job, 431 // if none of the bundle actions are in paused or pausedwitherror, the function should return 432 // false 433 434 // top down 435 // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error 436 // state, then job should be PAUSED otherwise it should be pausedwitherror 437 if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) { 438 if (bundleActionStatus.containsKey(Job.Status.KILLED) 439 || bundleActionStatus.containsKey(Job.Status.FAILED) 440 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 441 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 442 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR) 443 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 444 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; 445 } 446 else { 447 bundleJobStatus[0] = Job.Status.PAUSED; 448 } 449 ret = true; 450 } 451 452 // bottom up; check the status of parent through their children 453 else if (bundleActionStatus.containsKey(Job.Status.PAUSED) 454 && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) { 455 bundleJobStatus[0] = Job.Status.PAUSED; 456 ret = true; 457 } 458 else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 459 int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus 460 .get(Job.Status.PAUSED) : 0; 461 if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) { 462 bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR; 463 ret = true; 464 } 465 } 466 else { 467 ret = false; 468 } 469 return ret; 470 } 471 472 473 private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus, 474 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) { 475 boolean ret = false; 476 477 // TODO - When bottom up cmds are allowed to change the status of parent job, 478 // if none of the bundle actions are in suspended or suspendedwitherror, the function should return 479 // false 480 481 // top down 482 // if job is suspended 483 if (bundleStatus[0] == Job.Status.SUSPENDED 484 || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) { 485 if (bundleActionStatus.containsKey(Job.Status.KILLED) 486 || bundleActionStatus.containsKey(Job.Status.FAILED) 487 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 488 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR) 489 || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) { 490 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; 491 } 492 else { 493 bundleStatus[0] = Job.Status.SUSPENDED; 494 } 495 ret =true; 496 } 497 498 // bottom up 499 // Update status of parent from the status of its children 500 else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED) 501 || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) { 502 int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus 503 .get(Job.Status.SUCCEEDED) : 0; 504 int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus 505 .get(Job.Status.KILLED) : 0; 506 int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus 507 .get(Job.Status.FAILED) : 0; 508 int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus 509 .get(Job.Status.DONEWITHERROR) : 0; 510 511 if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) { 512 bundleStatus[0] = Job.Status.SUSPENDED; 513 ret = true; 514 } 515 else if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR) 516 + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) { 517 bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR; 518 ret = true; 519 } 520 } 521 return ret; 522 523 } 524 525 private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 526 int coordActionsCount, Job.Status[] coordStatus){ 527 boolean ret = false; 528 if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) { 529 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 530 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 531 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 532 coordStatus[0] = Job.Status.PAUSEDWITHERROR; 533 } 534 else { 535 coordStatus[0] = Job.Status.PAUSED; 536 } 537 ret = true; 538 } 539 return ret; 540 } 541 private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 542 int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) { 543 boolean ret = false; 544 545 // TODO - When bottom up cmds are allowed to change the status of parent job 546 //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false 547 //,then the function should return 548 // false 549 550 // top down 551 // check for children only when parent is suspended 552 if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) { 553 554 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 555 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 556 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 557 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; 558 } 559 else { 560 coordStatus[0] = Job.Status.SUSPENDED; 561 } 562 ret = true; 563 } 564 // bottom up 565 // look for children to check the parent's status only if materialization is 566 // done and all actions are non-pending 567 else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) { 568 int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus 569 .get(CoordinatorAction.Status.SUCCEEDED) : 0; 570 int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus 571 .get(CoordinatorAction.Status.KILLED) : 0; 572 int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus 573 .get(CoordinatorAction.Status.FAILED) : 0; 574 int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus 575 .get(CoordinatorAction.Status.TIMEDOUT) : 0; 576 577 if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) { 578 coordStatus[0] = Job.Status.SUSPENDED; 579 ret = true; 580 } 581 else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) 582 + succeededActions + killedActions + failedActions + timedoutActions) { 583 coordStatus[0] = Job.Status.SUSPENDEDWITHERROR; 584 ret = true; 585 } 586 } 587 return ret; 588 } 589 590 private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus, 591 int coordActionsCount, Job.Status[] coordStatus) { 592 boolean ret = false; 593 if (coordStatus[0] != Job.Status.PREP) { 594 if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) 595 || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) 596 || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) { 597 coordStatus[0] = Job.Status.RUNNINGWITHERROR; 598 } 599 else { 600 coordStatus[0] = Job.Status.RUNNING; 601 } 602 ret = true; 603 } 604 return ret; 605 } 606 607 private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus, 608 List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) { 609 boolean ret = false; 610 if (bundleStatus[0] != Job.Status.PREP) { 611 if (bundleActionStatus.containsKey(Job.Status.FAILED) 612 || bundleActionStatus.containsKey(Job.Status.KILLED) 613 || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) 614 || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) { 615 bundleStatus[0] = Job.Status.RUNNINGWITHERROR; 616 } 617 else { 618 bundleStatus[0] = Job.Status.RUNNING; 619 } 620 ret = true; 621 } 622 return ret; 623 624 } 625 626 private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus) 627 throws JPAExecutorException { 628 String jobId = bundleJob.getId(); 629 // Update the Bundle Job 630 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 631 // PAUSEDWITHERROR is not supported 632 bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus)); 633 if (isPending) { 634 bundleJob.setPending(); 635 LOG.info("Bundle job [" + jobId + "] Pending set to TRUE"); 636 } 637 else { 638 bundleJob.resetPending(); 639 LOG.info("Bundle job [" + jobId + "] Pending set to FALSE"); 640 } 641 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 642 } 643 644 private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus) 645 throws JPAExecutorException, CommandException { 646 Job.Status prevStatus = coordJob.getStatus(); 647 // Update the Coord Job 648 if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED 649 || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) { 650 if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) { 651 LOG.info("Coord Job [" + coordJob.getId() 652 + "] status to "+ coordStatus +" can not be updated as its already in Terminal state"); 653 return; 654 } 655 } 656 657 boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false); 658 // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is 659 // not supported 660 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus)); 661 // Backward support when coordinator namespace is 0.1 662 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 663 if (coordJob.getStatus() != prevStatus || isPendingStateChanged) { 664 LOG.debug("Updating coord job " + coordJob.getId()); 665 coordJob.setLastModifiedTime(new Date()); 666 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 667 } 668 // update bundle action only when status changes in coord job 669 if (coordJob.getBundleId() != null) { 670 if (!prevStatus.equals(coordJob.getStatus())) { 671 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 672 bundleStatusUpdate.call(); 673 } 674 } 675 } 676 677 private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB) 678 throws JPAExecutorException { 679 // Checking the coordinator pending should be updated or not 680 boolean prevPending = coordJob.isPending(); 681 if (isPending) { 682 coordJob.setPending(); 683 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE"); 684 } 685 else { 686 coordJob.resetPending(); 687 LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE"); 688 } 689 boolean hasChange = prevPending != coordJob.isPending(); 690 if (saveToDB && hasChange) { 691 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 692 } 693 return hasChange; 694 695 } 696 697 /** 698 * Aggregate coordinator actions' status to coordinator jobs 699 * 700 * @throws JPAExecutorException thrown if failed in db updates or retrievals 701 * @throws CommandException thrown if failed to run commands 702 */ 703 private void coordTransit() throws JPAExecutorException, CommandException { 704 List<CoordinatorJobBean> pendingJobCheckList = null; 705 if (lastInstanceStartTime == null) { 706 LOG.info("Running coordinator status service first instance"); 707 // this is the first instance, we need to check for all pending jobs; 708 pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit)); 709 } 710 else { 711 LOG.info("Running coordinator status service from last instance time = " 712 + DateUtils.formatDateOozieTZ(lastInstanceStartTime)); 713 // this is not the first instance, we should only check jobs 714 // that have actions or jobs been 715 // updated >= start time of last service run; 716 List<String> coordJobIdList = jpaService 717 .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime)); 718 Set<String> coordIds = new HashSet<String>(); 719 for (String coordJobId : coordJobIdList) { 720 coordIds.add(coordJobId); 721 } 722 coordIds.addAll(jpaService.execute(new CoordJobsGetChangedJPAExecutor(lastInstanceStartTime))); 723 724 pendingJobCheckList = new ArrayList<CoordinatorJobBean>(); 725 for (String coordId : coordIds.toArray(new String[coordIds.size()])) { 726 CoordinatorJobBean coordJob; 727 try{ 728 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId)); 729 } 730 catch (JPAExecutorException jpaee) { 731 if (jpaee.getErrorCode().equals(ErrorCode.E0604)) { 732 LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee); 733 continue; 734 } else { 735 throw jpaee; 736 } 737 } 738 // Running coord job might have pending false 739 Job.Status coordJobStatus = coordJob.getStatus(); 740 if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED) 741 || coordJobStatus.equals(Job.Status.RUNNING) 742 || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR) 743 || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 744 pendingJobCheckList.add(coordJob); 745 } 746 } 747 } 748 aggregateCoordJobsStatus(pendingJobCheckList); 749 } 750 } 751 752 /** 753 * Initializes the {@link StatusTransitService}. 754 * 755 * @param services services instance. 756 */ 757 @Override 758 public void init(Services services) { 759 Configuration conf = services.getConf(); 760 Runnable stateTransitRunnable = new StatusTransitRunnable(); 761 services.get(SchedulerService.class).schedule(stateTransitRunnable, 10, 762 conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC); 763 } 764 765 /** 766 * Destroy the StateTransit Jobs Service. 767 */ 768 @Override 769 public void destroy() { 770 } 771 772 /** 773 * Return the public interface for the purge jobs service. 774 * 775 * @return {@link StatusTransitService}. 776 */ 777 @Override 778 public Class<? extends Service> getInterface() { 779 return StatusTransitService.class; 780 } 781} 782