001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import java.util.TreeSet;
027import java.util.Comparator;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.oozie.BundleActionBean;
031import org.apache.oozie.BundleJobBean;
032import org.apache.oozie.CoordinatorJobBean;
033import org.apache.oozie.ErrorCode;
034import org.apache.oozie.client.CoordinatorAction;
035import org.apache.oozie.client.Job;
036import org.apache.oozie.command.CommandException;
037import org.apache.oozie.command.bundle.BundleKillXCommand;
038import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
039import org.apache.oozie.executor.jpa.BundleActionsGetByLastModifiedTimeJPAExecutor;
040import org.apache.oozie.executor.jpa.BundleActionsGetStatusPendingJPAExecutor;
041import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
042import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor;
043import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
044import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
045import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
046import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
048import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
049import org.apache.oozie.executor.jpa.CoordJobsGetChangedJPAExecutor;
050import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
051import org.apache.oozie.executor.jpa.JPAExecutorException;
052import org.apache.oozie.util.DateUtils;
053import org.apache.oozie.util.MemoryLocks;
054import org.apache.oozie.util.StatusUtils;
055import org.apache.oozie.util.XLog;
056
057/**
058 * StateTransitService is scheduled to run at the configured interval.
059 * <p/>
060 * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0 (job
061 * done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
062 * SUCCEEDED.
063 */
064public class StatusTransitService implements Service {
065    public static final String CONF_PREFIX = Service.CONF_PREFIX + "StatusTransitService.";
066    public static final String CONF_STATUSTRANSIT_INTERVAL = CONF_PREFIX + "statusTransit.interval";
067    public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = CONF_PREFIX + "backward.support.for.coord.status";
068    public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = CONF_PREFIX + "backward.support.for.states.without.error";
069    private static int limit = -1;
070    private static Date lastInstanceStartTime = null;
071    private final static XLog LOG = XLog.getLog(StatusTransitRunnable.class);
072
073    /**
074     * StateTransitRunnable is the runnable which is scheduled to run at the configured interval.
075     * <p/>
076     * It is to update job's status according to its child actions' status. If all child actions' pending flag equals 0
077     * (job done), we reset the job's pending flag to 0. If all child actions are succeeded, we set the job's status to
078     * SUCCEEDED.
079     */
080    public static class StatusTransitRunnable implements Runnable {
081        private JPAService jpaService = null;
082        private MemoryLocks.LockToken lock;
083
084        public StatusTransitRunnable() {
085            jpaService = Services.get().get(JPAService.class);
086            if (jpaService == null) {
087                LOG.error("Missing JPAService");
088            }
089        }
090
091        @Override
092        public void run() {
093            try {
094                Date curDate = new Date(); // records the start time of this service run;
095
096                // first check if there is some other instance running;
097                lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(),
098                        lockTimeout);
099                if (lock == null) {
100                    LOG.info("This StatusTransitService instance"
101                            + " will not run since there is already an instance running");
102                }
103                else {
104                    LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
105                    // running coord jobs transit service
106                    coordTransit();
107                    // running bundle jobs transit service
108                    bundleTransit();
109
110                    lastInstanceStartTime = curDate;
111                }
112            }
113            catch (Exception ex) {
114                LOG.warn("Exception happened during StatusTransitRunnable ", ex);
115            }
116            finally {
117                // release lock;
118                if (lock != null) {
119                    lock.release();
120                    LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
121                }
122            }
123        }
124
125        public List<BundleJobBean> removeDuplicates(List<BundleJobBean> pendingJobList) {
126            Set<BundleJobBean> s = new TreeSet<BundleJobBean>(new Comparator<BundleJobBean>() {
127                @Override
128                public int compare(BundleJobBean b1, BundleJobBean b2) {
129                    if (b1.getId().equals(b2.getId())) {
130                        return 0;
131                    }
132                    else
133                        return 1;
134                }
135            });
136            s.addAll(pendingJobList);
137            return new ArrayList<BundleJobBean>(s);
138        }
139
140        /**
141         * Aggregate bundle actions' status to bundle jobs
142         *
143         * @throws JPAExecutorException thrown if failed in db updates or retrievals
144         * @throws CommandException thrown if failed to run commands
145         */
146        private void bundleTransit() throws JPAExecutorException, CommandException {
147            List<BundleJobBean> pendingJobCheckList = null;
148
149            if (lastInstanceStartTime == null) {
150                LOG.info("Running bundle status service first instance");
151                // this is the first instance, we need to check for all pending or running jobs;
152                pendingJobCheckList = jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
153            }
154            else {
155                LOG.info("Running bundle status service from last instance time =  "
156                        + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
157                // this is not the first instance, we should only check jobs that have actions been
158                // updated >= start time of last service run;
159                List<BundleActionBean> actionList = jpaService
160                        .execute(new BundleActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
161                Set<String> bundleIds = new HashSet<String>();
162                for (BundleActionBean action : actionList) {
163                    bundleIds.add(action.getBundleId());
164                }
165                pendingJobCheckList = new ArrayList<BundleJobBean>();
166                for (String bundleId : bundleIds.toArray(new String[bundleIds.size()])) {
167                    BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId));
168                    // Running bundle job might have pending false
169                    if (bundle.isPending() || bundle.getStatus().equals(Job.Status.RUNNING)
170                            || bundle.getStatus().equals(Job.Status.RUNNINGWITHERROR)
171                            || bundle.getStatus().equals(Job.Status.PAUSED)
172                            || bundle.getStatus().equals(Job.Status.PAUSEDWITHERROR)) {
173                        pendingJobCheckList.add(bundle);
174                    }
175                }
176            }
177            aggregateBundleJobsStatus(pendingJobCheckList);
178        }
179
180        private void aggregateBundleJobsStatus(List<BundleJobBean> bundleLists) throws JPAExecutorException,
181                CommandException {
182            if (bundleLists != null) {
183                    for (BundleJobBean bundleJob : bundleLists) {
184                        try {
185                            String jobId = bundleJob.getId();
186                            Job.Status[] bundleStatus = new Job.Status[1];
187                            bundleStatus[0] = bundleJob.getStatus();
188                            List<BundleActionBean> bundleActions = jpaService.execute(new BundleActionsGetStatusPendingJPAExecutor(
189                                    jobId));
190                            HashMap<Job.Status, Integer> bundleActionStatus = new HashMap<Job.Status, Integer>();
191                            boolean foundPending = false;
192                            for (BundleActionBean bAction : bundleActions) {
193                                    int counter = 0;
194                                    if (bundleActionStatus.containsKey(bAction.getStatus())) {
195                                        counter = bundleActionStatus.get(bAction.getStatus()) + 1;
196                                    }
197                                    else {
198                                        ++counter;
199                                    }
200                                    bundleActionStatus.put(bAction.getStatus(), counter);
201                                    if (bAction.getCoordId() == null
202                                            && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED)) {
203                                        (new BundleKillXCommand(jobId)).call();
204                                        LOG.info("Bundle job ["+ jobId
205                                                        + "] has been killed since one of its coordinator job failed submission.");
206                                    }
207
208                                 if (bAction.isPending()) {
209                                    foundPending = true;
210                                }
211                            }
212
213                            if (!foundPending && checkTerminalStatus(bundleActionStatus, bundleActions, bundleStatus)) {
214                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
215                                        + "' from '" + bundleJob.getStatus() + "'");
216                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
217                            }
218                            else if (!foundPending && checkPrepStatus(bundleActionStatus, bundleActions, bundleStatus)) {
219                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
220                                        + "' from '" + bundleJob.getStatus() + "'");
221                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
222                            }
223                            else if (checkPausedStatus(bundleActionStatus, bundleActions, bundleStatus)) {
224                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
225                                        + "' from '" + bundleJob.getStatus() + "'");
226                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
227                            }
228                            else if (checkSuspendStatus(bundleActionStatus, bundleActions, bundleStatus, foundPending)) {
229                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
230                                        + "' from '" + bundleJob.getStatus() + "'");
231                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
232                            }
233                            else if (checkRunningStatus(bundleActionStatus, bundleActions, bundleStatus)) {
234                                LOG.info("Set bundle job [" + jobId + "] status to '" + bundleStatus[0].toString()
235                                        + "' from '" + bundleJob.getStatus() + "'");
236                                updateBundleJob(foundPending, bundleJob, bundleStatus[0]);
237                            }
238                        }
239                        catch (Exception ex) {
240                            LOG.error("Exception happened during aggregate bundle job's status, job = "
241                                    + bundleJob.getId(), ex);
242                        }
243                    }
244                }
245
246        }
247
248        private void aggregateCoordJobsStatus(List<CoordinatorJobBean> CoordList) throws JPAExecutorException,
249                CommandException {
250            if (CoordList != null) {
251                Configuration conf = Services.get().getConf();
252                boolean backwardSupportForCoordStatus = conf.getBoolean(CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
253
254                for (CoordinatorJobBean coordJob : CoordList) {
255                    try {
256                        // if namespace 0.1 is used and backward support is true, then ignore this coord job
257                        if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
258                                && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
259                            continue;
260                        }
261                        String jobId = coordJob.getId();
262                        Job.Status[] coordStatus = new Job.Status[1];
263                        coordStatus[0] = coordJob.getStatus();
264                        //Get count of Coordinator actions with pending true
265                        boolean isPending = false;
266                        int count = jpaService.execute(new CoordJobGetPendingActionsCountJPAExecutor(jobId));
267                        if (count > 0) {
268                             isPending = true;
269                        }
270                        // Get status of Coordinator actions
271                        List<CoordinatorAction.Status> coordActionStatusList = jpaService
272                                .execute(new CoordJobGetActionsStatusJPAExecutor(jobId));
273                        HashMap<CoordinatorAction.Status, Integer> coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
274
275                        for (CoordinatorAction.Status status : coordActionStatusList) {
276                            int counter = 0;
277                            if (coordActionStatus.containsKey(status)) {
278                                counter = coordActionStatus.get(status) + 1;
279                            }
280                            else {
281                                ++counter;
282                            }
283                            coordActionStatus.put(status, counter);
284                        }
285
286                        int nonPendingCoordActionsCount = coordActionStatusList.size();
287                        boolean isDoneMaterialization = coordJob.isDoneMaterialization();
288                        if ((isDoneMaterialization || coordStatus[0] == Job.Status.FAILED || coordStatus[0] == Job.Status.KILLED)
289                                && checkCoordTerminalStatus(coordActionStatus, nonPendingCoordActionsCount,
290                                        coordStatus, isDoneMaterialization)) {
291                            LOG.info("Set coordinator job [" + jobId + "] status to '" + coordStatus[0].toString()
292                                    + "' from '" + coordJob.getStatus() + "'");
293                            updateCoordJob(isPending, coordJob, coordStatus[0]);
294                        }
295                        else if (checkCoordPausedStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
296                            LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
297                                    + "' from '" + coordJob.getStatus() + "'");
298                            updateCoordJob(isPending, coordJob, coordStatus[0]);
299                        }
300                        else if(checkCoordSuspendStatus( coordActionStatus, nonPendingCoordActionsCount, coordStatus, coordJob.isDoneMaterialization(), isPending)) {
301                            LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
302                                    + "' from '" + coordJob.getStatus() + "'");
303                            updateCoordJob(isPending, coordJob, coordStatus[0]);
304                        }
305                        else if (checkCoordRunningStatus(coordActionStatus, nonPendingCoordActionsCount, coordStatus)) {
306                            LOG.info("Set coordinator job [" + jobId + "] status to " + coordStatus[0].toString()
307                                    + "' from '" + coordJob.getStatus() + "'");
308                            updateCoordJob(isPending, coordJob, coordStatus[0]);
309                        }
310                        else {
311                            checkCoordPending(isPending, coordJob, true);
312                        }
313                    }
314                    catch (Exception ex) {
315                        LOG.error("Exception happened during aggregate coordinator job's status, job = "
316                                + coordJob.getId(), ex);
317                    }
318                }
319
320            }
321        }
322
323        private boolean checkTerminalStatus(HashMap<Job.Status, Integer> bundleActionStatus,
324                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
325            boolean ret = false;
326            int totalValuesSucceed = 0;
327            if (bundleActionStatus.containsKey(Job.Status.SUCCEEDED)) {
328                totalValuesSucceed = bundleActionStatus.get(Job.Status.SUCCEEDED);
329            }
330            int totalValuesFailed = 0;
331            if (bundleActionStatus.containsKey(Job.Status.FAILED)) {
332                totalValuesFailed = bundleActionStatus.get(Job.Status.FAILED);
333            }
334            int totalValuesKilled = 0;
335            if (bundleActionStatus.containsKey(Job.Status.KILLED)) {
336                totalValuesKilled = bundleActionStatus.get(Job.Status.KILLED);
337            }
338
339            int totalValuesDoneWithError = 0;
340            if (bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)) {
341                totalValuesDoneWithError = bundleActionStatus.get(Job.Status.DONEWITHERROR);
342            }
343
344            if (bundleActions.size() == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesDoneWithError)) {
345                // If all the bundle actions are succeeded then bundle job should be succeeded.
346                if (bundleActions.size() == totalValuesSucceed) {
347                    bundleStatus[0] = Job.Status.SUCCEEDED;
348                    ret = true;
349                }
350                else if (bundleActions.size() == totalValuesKilled) {
351                    // If all the bundle actions are KILLED then bundle job should be KILLED.
352                    bundleStatus[0] = Job.Status.KILLED;
353                    ret = true;
354                }
355                else if (bundleActions.size() == totalValuesFailed) {
356                    // If all the bundle actions are FAILED then bundle job should be FAILED.
357                    bundleStatus[0] = Job.Status.FAILED;
358                    ret = true;
359                }
360                else {
361                    bundleStatus[0] = Job.Status.DONEWITHERROR;
362                    ret = true;
363                }
364            }
365            return ret;
366        }
367
368        private boolean checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
369                int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization) {
370            boolean ret = false;
371            int totalValuesSucceed = 0;
372            if (coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
373                totalValuesSucceed = coordActionStatus.get(CoordinatorAction.Status.SUCCEEDED);
374            }
375            int totalValuesFailed = 0;
376            if (coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)) {
377                totalValuesFailed = coordActionStatus.get(CoordinatorAction.Status.FAILED);
378            }
379            int totalValuesKilled = 0;
380            if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
381                totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
382            }
383
384            int totalValuesTimeOut = 0;
385            if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
386                totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
387            }
388
389            if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
390                // If all the coordinator actions are succeeded then coordinator job should be succeeded.
391                if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) {
392                    coordStatus[0] = Job.Status.SUCCEEDED;
393                    ret = true;
394                }
395                else if (coordActionsCount == totalValuesKilled) {
396                    // If all the coordinator actions are KILLED then coordinator job should be KILLED.
397                    coordStatus[0] = Job.Status.KILLED;
398                    ret = true;
399                }
400                else if (coordActionsCount == totalValuesFailed) {
401                    // If all the coordinator actions are FAILED then coordinator job should be FAILED.
402                    coordStatus[0] = Job.Status.FAILED;
403                    ret = true;
404                }
405                else {
406                    coordStatus[0] = Job.Status.DONEWITHERROR;
407                    ret = true;
408                }
409            }
410            return ret;
411        }
412
413        private boolean checkPrepStatus(HashMap<Job.Status, Integer> bundleActionStatus,
414                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
415            boolean ret = false;
416            if (bundleActionStatus.containsKey(Job.Status.PREP)) {
417                // If all the bundle actions are PREP then bundle job should be RUNNING.
418                if (bundleActions.size() > bundleActionStatus.get(Job.Status.PREP)) {
419                    bundleStatus[0] = Job.Status.RUNNING;
420                    ret = true;
421                }
422            }
423            return ret;
424        }
425
426        private boolean checkPausedStatus(HashMap<Job.Status, Integer> bundleActionStatus,
427                List<BundleActionBean> bundleActions, Job.Status[] bundleJobStatus) {
428            boolean ret = false;
429
430            // TODO - When bottom up cmds are allowed to change the status of parent job,
431            // if none of the bundle actions are in paused or pausedwitherror, the function should return
432            // false
433
434            // top down
435            // If the bundle job is PAUSED or PAUSEDINERROR and no children are in error
436            // state, then job should be PAUSED otherwise it should be pausedwitherror
437            if (bundleJobStatus[0] == Job.Status.PAUSED || bundleJobStatus[0] == Job.Status.PAUSEDWITHERROR) {
438                if (bundleActionStatus.containsKey(Job.Status.KILLED)
439                        || bundleActionStatus.containsKey(Job.Status.FAILED)
440                        || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
441                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
442                        || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)
443                        || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
444                    bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
445                }
446                else {
447                    bundleJobStatus[0] = Job.Status.PAUSED;
448                }
449                ret = true;
450            }
451
452            // bottom up; check the status of parent through their children
453            else if (bundleActionStatus.containsKey(Job.Status.PAUSED)
454                    && (bundleActions.size() == bundleActionStatus.get(Job.Status.PAUSED))) {
455                bundleJobStatus[0] = Job.Status.PAUSED;
456                ret = true;
457            }
458            else if (bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
459                int pausedActions = bundleActionStatus.containsKey(Job.Status.PAUSED) ? bundleActionStatus
460                        .get(Job.Status.PAUSED) : 0;
461                if (bundleActions.size() == pausedActions + bundleActionStatus.get(Job.Status.PAUSEDWITHERROR)) {
462                    bundleJobStatus[0] = Job.Status.PAUSEDWITHERROR;
463                    ret = true;
464                }
465            }
466            else {
467                ret = false;
468            }
469            return ret;
470        }
471
472
473        private boolean checkSuspendStatus(HashMap<Job.Status, Integer> bundleActionStatus,
474                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus, boolean isPending) {
475            boolean ret = false;
476
477            // TODO - When bottom up cmds are allowed to change the status of parent job,
478            // if none of the bundle actions are in suspended or suspendedwitherror, the function should return
479            // false
480
481            // top down
482            // if job is suspended
483            if (bundleStatus[0] == Job.Status.SUSPENDED
484                    || bundleStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
485                if (bundleActionStatus.containsKey(Job.Status.KILLED)
486                        || bundleActionStatus.containsKey(Job.Status.FAILED)
487                        || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
488                        || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)
489                        || bundleActionStatus.containsKey(Job.Status.PAUSEDWITHERROR)) {
490                    bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
491                }
492                else {
493                    bundleStatus[0] = Job.Status.SUSPENDED;
494                }
495                ret =true;
496            }
497
498            // bottom up
499            // Update status of parent from the status of its children
500            else if (!isPending && bundleActionStatus.containsKey(Job.Status.SUSPENDED)
501                    || bundleActionStatus.containsKey(Job.Status.SUSPENDEDWITHERROR)) {
502                int succeededActions = bundleActionStatus.containsKey(Job.Status.SUCCEEDED) ? bundleActionStatus
503                        .get(Job.Status.SUCCEEDED) : 0;
504                int killedActions = bundleActionStatus.containsKey(Job.Status.KILLED) ? bundleActionStatus
505                        .get(Job.Status.KILLED) : 0;
506                int failedActions = bundleActionStatus.containsKey(Job.Status.FAILED) ? bundleActionStatus
507                        .get(Job.Status.FAILED) : 0;
508                int doneWithErrorActions = bundleActionStatus.containsKey(Job.Status.DONEWITHERROR) ? bundleActionStatus
509                        .get(Job.Status.DONEWITHERROR) : 0;
510
511                if (bundleActions.size() == bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions) {
512                    bundleStatus[0] = Job.Status.SUSPENDED;
513                    ret = true;
514                }
515                else if (bundleActions.size()  == bundleActionStatus.get(Job.Status.SUSPENDEDWITHERROR)
516                        + bundleActionStatus.get(Job.Status.SUSPENDED) + succeededActions + killedActions + failedActions + doneWithErrorActions) {
517                    bundleStatus[0] = Job.Status.SUSPENDEDWITHERROR;
518                    ret = true;
519                }
520            }
521            return ret;
522
523        }
524
525        private boolean checkCoordPausedStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
526                int coordActionsCount, Job.Status[] coordStatus){
527            boolean ret = false;
528            if (coordStatus[0].equals(Job.Status.PAUSED) || coordStatus[0].equals(Job.Status.PAUSEDWITHERROR)) {
529                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
530                        || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
531                        || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
532                    coordStatus[0] = Job.Status.PAUSEDWITHERROR;
533                }
534                else {
535                    coordStatus[0] = Job.Status.PAUSED;
536                }
537                ret = true;
538            }
539            return ret;
540        }
541        private boolean checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
542                int coordActionsCount, Job.Status[] coordStatus, boolean isDoneMaterialization, boolean isPending) {
543            boolean ret = false;
544
545            // TODO - When bottom up cmds are allowed to change the status of parent job
546            //if none of the coord actions are in suspended or suspendedwitherror and materialization done is false
547            //,then the function should return
548            // false
549
550            // top down
551            // check for children only when parent is suspended
552            if (coordStatus[0] == Job.Status.SUSPENDED || coordStatus[0] == Job.Status.SUSPENDEDWITHERROR) {
553
554                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
555                        || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
556                        || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
557                    coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
558                }
559                else {
560                    coordStatus[0] = Job.Status.SUSPENDED;
561                }
562                ret = true;
563            }
564            // bottom up
565            // look for children to check the parent's status only if materialization is
566            // done and all actions are non-pending
567            else if (isDoneMaterialization && !isPending && coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
568                int succeededActions = coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED) ? coordActionStatus
569                       .get(CoordinatorAction.Status.SUCCEEDED) : 0;
570                int killedActions = coordActionStatus.containsKey(CoordinatorAction.Status.KILLED) ? coordActionStatus
571                        .get(CoordinatorAction.Status.KILLED) : 0;
572                int failedActions = coordActionStatus.containsKey(CoordinatorAction.Status.FAILED) ? coordActionStatus
573                        .get(CoordinatorAction.Status.FAILED) : 0;
574                int timedoutActions = coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT) ? coordActionStatus
575                        .get(CoordinatorAction.Status.TIMEDOUT) : 0;
576
577                if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED) + succeededActions) {
578                    coordStatus[0] = Job.Status.SUSPENDED;
579                    ret = true;
580                }
581                else if (coordActionsCount == coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)
582                        + succeededActions + killedActions + failedActions + timedoutActions) {
583                    coordStatus[0] = Job.Status.SUSPENDEDWITHERROR;
584                    ret = true;
585                }
586            }
587            return ret;
588        }
589
590        private boolean checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer> coordActionStatus,
591                int coordActionsCount, Job.Status[] coordStatus) {
592            boolean ret = false;
593            if (coordStatus[0] != Job.Status.PREP) {
594                if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)
595                        || coordActionStatus.containsKey(CoordinatorAction.Status.FAILED)
596                        || coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
597                    coordStatus[0] = Job.Status.RUNNINGWITHERROR;
598                }
599                else {
600                    coordStatus[0] = Job.Status.RUNNING;
601                }
602                ret = true;
603            }
604            return ret;
605        }
606
607        private boolean checkRunningStatus(HashMap<Job.Status, Integer> bundleActionStatus,
608                List<BundleActionBean> bundleActions, Job.Status[] bundleStatus) {
609            boolean ret = false;
610            if (bundleStatus[0] != Job.Status.PREP) {
611                if (bundleActionStatus.containsKey(Job.Status.FAILED)
612                        || bundleActionStatus.containsKey(Job.Status.KILLED)
613                        || bundleActionStatus.containsKey(Job.Status.DONEWITHERROR)
614                        || bundleActionStatus.containsKey(Job.Status.RUNNINGWITHERROR)) {
615                    bundleStatus[0] = Job.Status.RUNNINGWITHERROR;
616                }
617                else {
618                    bundleStatus[0] = Job.Status.RUNNING;
619                }
620                ret = true;
621            }
622            return ret;
623
624        }
625
626        private void updateBundleJob(boolean isPending, BundleJobBean bundleJob, Job.Status bundleStatus)
627                throws JPAExecutorException {
628            String jobId = bundleJob.getId();
629            // Update the Bundle Job
630            // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
631            // PAUSEDWITHERROR is not supported
632            bundleJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(bundleStatus));
633            if (isPending) {
634                bundleJob.setPending();
635                LOG.info("Bundle job [" + jobId + "] Pending set to TRUE");
636            }
637            else {
638                bundleJob.resetPending();
639                LOG.info("Bundle job [" + jobId + "] Pending set to FALSE");
640            }
641            jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob));
642        }
643
644        private void updateCoordJob(boolean isPending, CoordinatorJobBean coordJob, Job.Status coordStatus)
645                throws JPAExecutorException, CommandException {
646            Job.Status prevStatus = coordJob.getStatus();
647            // Update the Coord Job
648            if (coordJob.getStatus() == Job.Status.SUCCEEDED || coordJob.getStatus() == Job.Status.FAILED
649                    || coordJob.getStatus() == Job.Status.KILLED || coordJob.getStatus() == Job.Status.DONEWITHERROR) {
650                if (coordStatus == Job.Status.SUSPENDED || coordStatus == Job.Status.SUSPENDEDWITHERROR) {
651                    LOG.info("Coord Job [" + coordJob.getId()
652                            + "] status to "+ coordStatus +" can not be updated as its already in Terminal state");
653                    return;
654                }
655            }
656
657            boolean isPendingStateChanged = checkCoordPending(isPending, coordJob, false);
658            // Check for backward support when RUNNINGWITHERROR, SUSPENDEDWITHERROR and PAUSEDWITHERROR is
659            // not supported
660            coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(coordStatus));
661            // Backward support when coordinator namespace is 0.1
662            coordJob.setStatus(StatusUtils.getStatus(coordJob));
663            if (coordJob.getStatus() != prevStatus || isPendingStateChanged) {
664                LOG.debug("Updating coord job " + coordJob.getId());
665                coordJob.setLastModifiedTime(new Date());
666                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
667            }
668            // update bundle action only when status changes in coord job
669            if (coordJob.getBundleId() != null) {
670                if (!prevStatus.equals(coordJob.getStatus())) {
671                    BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
672                    bundleStatusUpdate.call();
673                }
674            }
675        }
676
677        private boolean checkCoordPending(boolean isPending, CoordinatorJobBean coordJob, boolean saveToDB)
678                throws JPAExecutorException {
679            // Checking the coordinator pending should be updated or not
680            boolean prevPending = coordJob.isPending();
681            if (isPending) {
682                coordJob.setPending();
683                LOG.info("Coord job [" + coordJob.getId() + "] Pending set to TRUE");
684            }
685            else {
686                coordJob.resetPending();
687                LOG.info("Coord job [" + coordJob.getId() + "] Pending set to FALSE");
688            }
689            boolean hasChange = prevPending != coordJob.isPending();
690            if (saveToDB && hasChange) {
691                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
692            }
693            return hasChange;
694
695        }
696
697        /**
698         * Aggregate coordinator actions' status to coordinator jobs
699         *
700         * @throws JPAExecutorException thrown if failed in db updates or retrievals
701         * @throws CommandException thrown if failed to run commands
702         */
703        private void coordTransit() throws JPAExecutorException, CommandException {
704            List<CoordinatorJobBean> pendingJobCheckList = null;
705            if (lastInstanceStartTime == null) {
706                LOG.info("Running coordinator status service first instance");
707                // this is the first instance, we need to check for all pending jobs;
708                pendingJobCheckList = jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
709            }
710            else {
711                LOG.info("Running coordinator status service from last instance time =  "
712                        + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
713                // this is not the first instance, we should only check jobs
714                // that have actions or jobs been
715                // updated >= start time of last service run;
716                List<String> coordJobIdList = jpaService
717                        .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
718                Set<String> coordIds = new HashSet<String>();
719                for (String coordJobId : coordJobIdList) {
720                    coordIds.add(coordJobId);
721                }
722                coordIds.addAll(jpaService.execute(new CoordJobsGetChangedJPAExecutor(lastInstanceStartTime)));
723
724                pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
725                for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
726                    CoordinatorJobBean coordJob;
727                    try{
728                        coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
729                    }
730                    catch (JPAExecutorException jpaee) {
731                        if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
732                            LOG.warn("Exception happened during StatusTransitRunnable; Coordinator Job doesn't exist", jpaee);
733                            continue;
734                        } else {
735                            throw jpaee;
736                        }
737                    }
738                    // Running coord job might have pending false
739                    Job.Status coordJobStatus = coordJob.getStatus();
740                    if (coordJob.isPending() || coordJobStatus.equals(Job.Status.PAUSED)
741                            || coordJobStatus.equals(Job.Status.RUNNING)
742                            || coordJobStatus.equals(Job.Status.RUNNINGWITHERROR)
743                            || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
744                        pendingJobCheckList.add(coordJob);
745                    }
746                }
747            }
748            aggregateCoordJobsStatus(pendingJobCheckList);
749        }
750    }
751
752    /**
753     * Initializes the {@link StatusTransitService}.
754     *
755     * @param services services instance.
756     */
757    @Override
758    public void init(Services services) {
759        Configuration conf = services.getConf();
760        Runnable stateTransitRunnable = new StatusTransitRunnable();
761        services.get(SchedulerService.class).schedule(stateTransitRunnable, 10,
762                conf.getInt(CONF_STATUSTRANSIT_INTERVAL, 60), SchedulerService.Unit.SEC);
763    }
764
765    /**
766     * Destroy the StateTransit Jobs Service.
767     */
768    @Override
769    public void destroy() {
770    }
771
772    /**
773     * Return the public interface for the purge jobs service.
774     *
775     * @return {@link StatusTransitService}.
776     */
777    @Override
778    public Class<? extends Service> getInterface() {
779        return StatusTransitService.class;
780    }
781}
782