001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.sla;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Date;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.oozie.AppType;
032import org.apache.oozie.CoordinatorActionBean;
033import org.apache.oozie.ErrorCode;
034import org.apache.oozie.WorkflowActionBean;
035import org.apache.oozie.WorkflowJobBean;
036import org.apache.oozie.client.CoordinatorAction;
037import org.apache.oozie.client.WorkflowAction;
038import org.apache.oozie.client.WorkflowJob;
039import org.apache.oozie.client.event.JobEvent;
040import org.apache.oozie.client.event.SLAEvent.EventStatus;
041import org.apache.oozie.client.event.SLAEvent.SLAStatus;
042import org.apache.oozie.client.rest.JsonBean;
043import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
044import org.apache.oozie.executor.jpa.JPAExecutorException;
045import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
046import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
047
048import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
049import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
050import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
051import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
052import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusActualTimesJPAExecutor;
053import org.apache.oozie.service.EventHandlerService;
054import org.apache.oozie.service.JPAService;
055import org.apache.oozie.service.ServiceException;
056import org.apache.oozie.service.Services;
057import org.apache.oozie.sla.service.SLAService;
058import org.apache.oozie.util.XLog;
059
060
061/**
062 * Implementation class for SLACalculator that calculates SLA related to
063 * start/end/duration of jobs using a memory-based map
064 */
065public class SLACalculatorMemory implements SLACalculator {
066
067    private static final XLog LOG = XLog.getLog(SLACalculatorMemory.class);
068    // TODO optimization priority based insertion/processing/bumping up-down
069    private static Map<String, SLACalcStatus> slaMap;
070    private static Set<String> historySet;
071    private static int capacity;
072    private static JPAService jpaService;
073    private EventHandlerService eventHandler;
074    private static int modifiedAfter;
075    private static long jobEventLatency;
076
077    @Override
078    public void init(Configuration conf) throws ServiceException {
079        capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000);
080        jobEventLatency = conf.getInt(SLAService.CONF_JOB_EVENT_LATENCY, 90 * 1000);
081        slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
082        historySet = Collections.synchronizedSet(new HashSet<String>());
083        jpaService = Services.get().get(JPAService.class);
084        eventHandler = Services.get().get(EventHandlerService.class);
085        // load events modified after
086        modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
087        loadOnRestart();
088
089    }
090
091    private void loadOnRestart() {
092        boolean isJobModified = false;
093        try {
094            long slaPendingCount = 0;
095            long statusPendingCount = 0;
096            List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
097                    modifiedAfter));
098            for (SLASummaryBean summaryBean : summaryBeans) {
099                String jobId = summaryBean.getId();
100                try {
101                    switch (summaryBean.getAppType()) {
102                        case COORDINATOR_ACTION:
103                            isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
104                            break;
105                        case WORKFLOW_ACTION:
106                            isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
107                            break;
108                        case WORKFLOW_JOB:
109                            isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
110                            break;
111                        default:
112                            break;
113                    }
114                    if (isJobModified) {
115                        jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(summaryBean));
116                    }
117                }
118                catch (Exception e) {
119                    LOG.warn("Failed to load records for " + jobId, e);
120                }
121                try {
122                    if (summaryBean.getEventProcessed() == 7) {
123                        historySet.add(jobId);
124                        statusPendingCount++;
125                    }
126                    else if (summaryBean.getEventProcessed() <= 7) {
127                        SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(
128                                jobId));
129                        SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
130                        slaMap.put(jobId, slaCalcStatus);
131                        slaPendingCount++;
132                    }
133                }
134                catch (Exception e) {
135                    LOG.warn("Failed to fetch/update records for " + jobId, e);
136                }
137
138            }
139            LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount);
140
141        }
142        catch (Exception e) {
143            LOG.warn("Failed to retrieve SLASummary records on restart", e);
144        }
145    }
146
147    private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId)
148            throws JPAExecutorException {
149        boolean isJobModified = false;
150        CoordinatorActionBean coordAction = null;
151        coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId));
152        if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) {
153            LOG.trace("Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is "
154                    + summaryBean.getJobStatus());
155            isJobModified = true;
156            summaryBean.setJobStatus(coordAction.getStatusStr());
157            if (coordAction.isTerminalStatus()) {
158                WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
159                        .getExternalId()));
160                setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(),
161                        coordAction.getStatusStr());
162            }
163            else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) {
164                WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
165                        .getExternalId()));
166                setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
167            }
168        }
169        return isJobModified;
170    }
171
172    private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId)
173            throws JPAExecutorException {
174        boolean isJobModified = false;
175        WorkflowActionBean wfAction = null;
176        wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId));
177        if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) {
178            LOG.trace("Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is "
179                    + summaryBean.getJobStatus());
180            isJobModified = true;
181            summaryBean.setJobStatus(wfAction.getStatusStr());
182            if (wfAction.inTerminalState()) {
183                setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
184            }
185            else if (wfAction.getStatus() != WorkflowAction.Status.PREP) {
186                setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime());
187            }
188        }
189        return isJobModified;
190    }
191
192    private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId)
193            throws JPAExecutorException {
194        boolean isJobModified = false;
195        WorkflowJobBean wfJob = null;
196        wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId));
197        if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) {
198            LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is "
199                    + summaryBean.getJobStatus());
200            isJobModified = true;
201            summaryBean.setJobStatus(wfJob.getStatusStr());
202            if (wfJob.inTerminalState()) {
203                setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
204            }
205            else if (wfJob.getStatus() != WorkflowJob.Status.PREP) {
206                setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
207            }
208        }
209        return isJobModified;
210    }
211
212    private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) {
213        byte eventProc = summaryBean.getEventProcessed();
214        summaryBean.setEventProcessed(8);
215        summaryBean.setActualStart(startTime);
216        summaryBean.setActualEnd(endTime);
217        long actualDuration = endTime.getTime() - startTime.getTime();
218        summaryBean.setActualDuration(actualDuration);
219        if (eventProc < 4) {
220            if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name())
221                    || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) {
222                if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) {
223                    summaryBean.setSLAStatus(SLAStatus.MET);
224                }
225                else {
226                    summaryBean.setSLAStatus(SLAStatus.MISS);
227                }
228            }
229            else {
230                summaryBean.setSLAStatus(SLAStatus.MISS);
231            }
232        }
233
234    }
235
236    private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) {
237        if (((eventProc & 1) == 0)) {
238            eventProc += 1;
239            summaryBean.setEventProcessed(eventProc);
240        }
241        if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
242            summaryBean.setSLAStatus(SLAStatus.IN_PROCESS);
243        }
244        summaryBean.setActualStart(startTime);
245    }
246
247    @Override
248    public int size() {
249        return slaMap.size();
250    }
251
252    @Override
253    public SLACalcStatus get(String jobId) throws JPAExecutorException {
254        SLACalcStatus memObj;
255        memObj = slaMap.get(jobId);
256        if (memObj == null && historySet.contains(jobId)) {
257            memObj = new SLACalcStatus(jpaService.execute(new SLASummaryGetJPAExecutor(jobId)),
258                    jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(jobId)));
259        }
260        return memObj;
261    }
262
263    @Override
264    public Iterator<String> iterator() {
265        return slaMap.keySet().iterator();
266    }
267
268    @Override
269    public boolean isEmpty() {
270        return slaMap.isEmpty();
271    }
272
273    @Override
274    public void clear() {
275        slaMap.clear();
276        historySet.clear();
277    }
278
279    /**
280     * Invoked via periodic run, update the SLA for registered jobs
281     */
282    protected void updateJobSla(String jobId) throws JPAExecutorException, ServiceException {
283        SLACalcStatus slaCalc = slaMap.get(jobId);
284        synchronized (slaCalc) {
285            boolean change = false;
286            byte eventProc = slaCalc.getEventProcessed();
287            SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
288            // calculation w.r.t current time and status
289            if ((eventProc & 1) == 0) { // first bit (start-processed) unset
290                if (reg.getExpectedStart() != null) {
291                    if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
292                        confirmWithDB(slaCalc);
293                        eventProc = slaCalc.getEventProcessed();
294                        if (eventProc != 8 && (eventProc & 1 ) == 0) {
295                            //Some DB exception
296                            slaCalc.setEventStatus(EventStatus.START_MISS);
297                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
298                            eventProc++;
299                        }
300                        change = true;
301                    }
302                }
303                else {
304                    eventProc++; //disable further processing for optional start sla condition
305                    change = true;
306                }
307            }
308            if (((eventProc >> 1) & 1) == 0 && eventProc != 8) { // check if second bit (duration-processed) is unset
309                if (reg.getExpectedDuration() == -1) {
310                    eventProc += 2;
311                    change = true;
312                }
313                else if (slaCalc.getActualStart() != null) {
314                    if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
315                            .getActualStart().getTime())) {
316                        slaCalc.setEventProcessed(eventProc);
317                        confirmWithDB(slaCalc);
318                        eventProc = slaCalc.getEventProcessed();
319                        if (eventProc != 8 && ((eventProc >> 1) & 1 ) == 0) {
320                            //Some DB exception
321                            slaCalc.setEventStatus(EventStatus.DURATION_MISS);
322                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
323                            eventProc += 2;
324                        }
325                        change = true;
326                    }
327                }
328            }
329            if (eventProc < 4) {
330                if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
331                    slaCalc.setEventProcessed(eventProc);
332                    confirmWithDB(slaCalc);
333                    eventProc = slaCalc.getEventProcessed();
334                    change = true;
335                }
336            }
337            if (change) {
338                if (slaCalc.getEventProcessed() >= 8) { //no more processing, no transfer to history set
339                    eventProc = 8;
340                    slaCalc.setEventProcessed(8); // Should not be > 8. But to handle any corner cases.
341                    slaMap.remove(jobId);
342                }
343                else {
344                    slaCalc.setEventProcessed(eventProc);
345                }
346                SLASummaryBean slaSummaryBean = new SLASummaryBean();
347                slaSummaryBean.setId(slaCalc.getId());
348                slaSummaryBean.setEventProcessed(eventProc);
349                slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
350                slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
351                slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
352                slaSummaryBean.setActualStart(slaCalc.getActualStart());
353                slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
354                slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
355                jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaSummaryBean));
356                if (eventProc == 7) {
357                    historySet.add(jobId);
358                    slaMap.remove(jobId);
359                    LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
360                }
361
362            }
363        }
364    }
365
366    /**
367     * Periodically run by the SLAService worker threads to update SLA status by
368     * iterating through all the jobs in the map
369     */
370    @Override
371    public void updateAllSlaStatus() {
372        LOG.info("Running periodic SLA check");
373        Iterator<String> iterator = slaMap.keySet().iterator();
374        while (iterator.hasNext()) {
375            String jobId = iterator.next();
376            try {
377                LOG.trace("Processing SLA for jobid={0}", jobId);
378                updateJobSla(jobId);
379            }
380            catch (Exception e) {
381                LOG.error("Exception in SLA processing for job [{0}]", jobId, e);
382            }
383        }
384    }
385
386    /**
387     * Register a new job into the map for SLA tracking
388     */
389    @Override
390    public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
391        try {
392            if (slaMap.size() < capacity) {
393                SLACalcStatus slaCalc = new SLACalcStatus(reg);
394                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
395                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
396                slaMap.put(jobId, slaCalc);
397                List<JsonBean> insertList = new ArrayList<JsonBean>();
398                insertList.add(reg);
399                insertList.add(new SLASummaryBean(slaCalc));
400                jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
401                LOG.trace("SLA Registration Event - Job:" + jobId);
402                return true;
403            }
404            else {
405                LOG.error(
406                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
407                        reg.getId());
408            }
409        }
410        catch (JPAExecutorException jpa) {
411            throw jpa;
412        }
413        return false;
414    }
415
416    private String getJobStatus(AppType appType) {
417        String status = null;
418        switch (appType) {
419            case COORDINATOR_ACTION:
420                status = CoordinatorAction.Status.WAITING.name();
421                break;
422            case WORKFLOW_ACTION:
423                status = WorkflowAction.Status.PREP.name();
424                break;
425            case WORKFLOW_JOB:
426                status = WorkflowJob.Status.PREP.name();
427                break;
428            default:
429                break;
430        }
431        return status;
432    }
433
434    /**
435     * Update job into the map for SLA tracking
436     */
437    @Override
438    public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
439        try {
440            if (slaMap.size() < capacity) {
441                SLACalcStatus slaCalc = new SLACalcStatus(reg);
442                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
443                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
444                slaMap.put(jobId, slaCalc);
445                List<JsonBean> updateList = new ArrayList<JsonBean>();
446                updateList.add(reg);
447                updateList.add(new SLASummaryBean(slaCalc));
448                jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
449                LOG.trace("SLA Registration Event - Job:" + jobId);
450                return true;
451            }
452            else {
453                LOG.error(
454                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
455                        reg.getId());
456            }
457        }
458        catch (JPAExecutorException jpa) {
459            throw jpa;
460        }
461        return false;
462    }
463
464    /**
465     * Remove job from being tracked in map
466     */
467    @Override
468    public void removeRegistration(String jobId) {
469        if (slaMap.remove(jobId) == null) {
470            historySet.remove(jobId);
471        }
472    }
473
474    /**
475     * Triggered after receiving Job status change event, update SLA status
476     * accordingly
477     */
478    @Override
479    public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
480            Date endTime) throws JPAExecutorException, ServiceException {
481        SLACalcStatus slaCalc = slaMap.get(jobId);
482        SLASummaryBean slaInfo = null;
483        boolean hasSla = false;
484        if (slaCalc != null) {
485            synchronized (slaCalc) {
486                slaCalc.setJobStatus(jobStatus);
487                switch (jobEventStatus) {
488                    case STARTED:
489                        slaInfo = processJobStartSLA(slaCalc, startTime);
490                        break;
491                    case SUCCESS:
492                        slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
493                        break;
494                    case FAILURE:
495                        slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
496                        break;
497                    default:
498                        LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
499                        slaInfo = getSLASummaryBean(slaCalc);
500                }
501
502                if (slaCalc.getEventProcessed() == 7) {
503                    slaInfo.setEventProcessed(8);
504                    slaMap.remove(jobId);
505                }
506                hasSla = true;
507            }
508            LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
509        }
510        else if (historySet.contains(jobId)) {
511            slaInfo = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
512            if (slaInfo == null) {
513                throw new JPAExecutorException(ErrorCode.E0604, jobId);
514            }
515            slaInfo.setJobStatus(jobStatus);
516            slaInfo.setActualStart(startTime);
517            slaInfo.setActualEnd(endTime);
518            if (endTime != null) {
519                slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
520            }
521            slaInfo.setEventProcessed(8);
522            historySet.remove(jobId);
523            hasSla = true;
524        }
525
526        if (hasSla) {
527            jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaInfo));
528        }
529        return hasSla;
530    }
531
532    /**
533     * Process SLA for jobs that started running. Also update actual-start time
534     *
535     * @param slaCalc
536     * @param actualStart
537     * @return SLASummaryBean
538     */
539    private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) {
540        slaCalc.setActualStart(actualStart);
541        if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
542            slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
543        }
544        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
545        Date expecStart = reg.getExpectedStart();
546        byte eventProc = slaCalc.getEventProcessed();
547        // set event proc here
548        if (((eventProc & 1) == 0)) {
549            if (expecStart != null) {
550                if (actualStart.getTime() > expecStart.getTime()) {
551                    slaCalc.setEventStatus(EventStatus.START_MISS);
552                }
553                else {
554                    slaCalc.setEventStatus(EventStatus.START_MET);
555                }
556                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
557            }
558            eventProc += 1;
559            slaCalc.setEventProcessed(eventProc);
560        }
561        return getSLASummaryBean(slaCalc);
562    }
563
564    /**
565     * Process SLA for jobs that ended successfully. Also update actual-start
566     * and end time
567     *
568     * @param slaCalc
569     * @param actualStart
570     * @param actualEnd
571     * @return SLASummaryBean
572     * @throws JPAExecutorException
573     */
574    private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
575        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
576        slaCalc.setActualStart(actualStart);
577        slaCalc.setActualEnd(actualEnd);
578        long expectedDuration = reg.getExpectedDuration();
579        long actualDuration = actualEnd.getTime() - actualStart.getTime();
580        slaCalc.setActualDuration(actualDuration);
581        //check event proc
582        byte eventProc = slaCalc.getEventProcessed();
583        if (((eventProc >> 1) & 1) == 0) {
584            processDurationSLA(expectedDuration, actualDuration, slaCalc);
585            eventProc += 2;
586            slaCalc.setEventProcessed(eventProc);
587        }
588
589        if (eventProc < 4) {
590            Date expectedEnd = reg.getExpectedEnd();
591            if (actualEnd.getTime() > expectedEnd.getTime()) {
592                slaCalc.setEventStatus(EventStatus.END_MISS);
593                slaCalc.setSLAStatus(SLAStatus.MISS);
594            }
595            else {
596                slaCalc.setEventStatus(EventStatus.END_MET);
597                slaCalc.setSLAStatus(SLAStatus.MET);
598            }
599            eventProc += 4;
600            slaCalc.setEventProcessed(eventProc);
601            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
602        }
603        return getSLASummaryBean(slaCalc);
604    }
605
606    /**
607     * Process SLA for jobs that ended in failure. Also update actual-start and
608     * end time
609     *
610     * @param slaCalc
611     * @param actualStart
612     * @param actualEnd
613     * @return SLASummaryBean
614     * @throws JPAExecutorException
615     */
616    private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
617        slaCalc.setActualStart(actualStart);
618        slaCalc.setActualEnd(actualEnd);
619        if (actualStart == null) { // job failed before starting
620            if (slaCalc.getEventProcessed() < 4) {
621                slaCalc.setEventStatus(EventStatus.END_MISS);
622                slaCalc.setSLAStatus(SLAStatus.MISS);
623                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
624                slaCalc.setEventProcessed(7);
625                return getSLASummaryBean(slaCalc);
626            }
627        }
628        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
629        long expectedDuration = reg.getExpectedDuration();
630        long actualDuration = actualEnd.getTime() - actualStart.getTime();
631        slaCalc.setActualDuration(actualDuration);
632
633        byte eventProc = slaCalc.getEventProcessed();
634        if (((eventProc >> 1) & 1) == 0) {
635            if (expectedDuration != -1) {
636                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
637                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
638            }
639            eventProc += 2;
640            slaCalc.setEventProcessed(eventProc);
641        }
642        if (eventProc < 4) {
643            slaCalc.setEventStatus(EventStatus.END_MISS);
644            slaCalc.setSLAStatus(SLAStatus.MISS);
645            eventProc += 4;
646            slaCalc.setEventProcessed(eventProc);
647            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
648        }
649        return getSLASummaryBean(slaCalc);
650    }
651
652    private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) {
653        SLASummaryBean slaSummaryBean = new SLASummaryBean();
654        slaSummaryBean.setActualStart(slaCalc.getActualStart());
655        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
656        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
657        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
658        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
659        slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
660        slaSummaryBean.setId(slaCalc.getId());
661        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
662        return slaSummaryBean;
663    }
664
665    private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
666        if (expected != -1 && actual > expected) {
667            slaCalc.setEventStatus(EventStatus.DURATION_MISS);
668            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
669        }
670        else if (expected != -1 && actual <= expected) {
671            slaCalc.setEventStatus(EventStatus.DURATION_MET);
672            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
673        }
674    }
675
676    /*
677     * Confirm alerts against source of truth - DB. Also required in case of High Availability
678     */
679    private void confirmWithDB(SLACalcStatus slaCalc) {
680        boolean ended = false, isEndMiss = false;
681        try {
682            switch (slaCalc.getAppType()) {
683                case WORKFLOW_JOB:
684                    WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId()));
685                    if (wf.getEndTime() != null) {
686                        ended = true;
687                        if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
688                                || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
689                            isEndMiss = true;
690                        }
691                    }
692                    slaCalc.setActualStart(wf.getStartTime());
693                    slaCalc.setActualEnd(wf.getEndTime());
694                    slaCalc.setJobStatus(wf.getStatusStr());
695                    break;
696                case WORKFLOW_ACTION:
697                    WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId()));
698                    if (wa.getEndTime() != null) {
699                        ended = true;
700                        if (wa.isTerminalWithFailure()
701                                || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
702                            isEndMiss = true;
703                        }
704                    }
705                    slaCalc.setActualStart(wa.getStartTime());
706                    slaCalc.setActualEnd(wa.getEndTime());
707                    slaCalc.setJobStatus(wa.getStatusStr());
708                    break;
709                case COORDINATOR_ACTION:
710                    CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
711                    if (ca.isTerminalWithFailure()) {
712                        isEndMiss = ended = true;
713                        slaCalc.setActualStart(null);
714                        slaCalc.setActualEnd(ca.getLastModifiedTime());
715                    }
716                    if (ca.getExternalId() != null) {
717                        wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId()));
718                        if (wf.getEndTime() != null) {
719                            ended = true;
720                            if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
721                                isEndMiss = true;
722                            }
723                        }
724                        slaCalc.setActualEnd(wf.getEndTime());
725                        slaCalc.setActualStart(wf.getStartTime());
726                    }
727                    slaCalc.setJobStatus(ca.getStatusStr());
728                    break;
729                default:
730                    LOG.debug("Unsupported App-type for SLA - " + slaCalc.getAppType());
731            }
732
733            byte eventProc = slaCalc.getEventProcessed();
734            if (ended) {
735                if (isEndMiss) {
736                    slaCalc.setSLAStatus(SLAStatus.MISS);
737                }
738                else {
739                    slaCalc.setSLAStatus(SLAStatus.MET);
740                }
741                if (slaCalc.getActualStart() != null) {
742                    if ((eventProc & 1) == 0) {
743                        if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
744                            slaCalc.setEventStatus(EventStatus.START_MISS);
745                        }
746                        else {
747                            slaCalc.setEventStatus(EventStatus.START_MET);
748                        }
749                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
750                    }
751                    slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
752                    if (((eventProc >> 1) & 1) == 0) {
753                        processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
754                    }
755                }
756                if (eventProc < 4) {
757                    if (isEndMiss) {
758                        slaCalc.setEventStatus(EventStatus.END_MISS);
759                    }
760                    else {
761                        slaCalc.setEventStatus(EventStatus.END_MET);
762                    }
763                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
764                }
765                slaCalc.setEventProcessed(8);
766            }
767            else {
768                if (slaCalc.getActualStart() != null) {
769                    slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
770                }
771                if ((eventProc & 1) == 0) {
772                    if (slaCalc.getActualStart() != null) {
773                        if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
774                            slaCalc.setEventStatus(EventStatus.START_MISS);
775                        }
776                        else {
777                            slaCalc.setEventStatus(EventStatus.START_MET);
778                        }
779                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
780                        eventProc++;
781                    }
782                    else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
783                        slaCalc.setEventStatus(EventStatus.START_MISS);
784                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
785                        eventProc++;
786                    }
787                }
788                if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != null
789                        && slaCalc.getExpectedDuration() != -1) {
790                    if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
791                        slaCalc.setEventStatus(EventStatus.DURATION_MISS);
792                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
793                        eventProc += 2;
794                    }
795                }
796                if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
797                    slaCalc.setEventStatus(EventStatus.END_MISS);
798                    slaCalc.setSLAStatus(SLAStatus.MISS);
799                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
800                    eventProc += 4;
801                }
802                slaCalc.setEventProcessed(eventProc);
803            }
804        }
805        catch (Exception e) {
806            LOG.warn("Error while confirming SLA against DB for jobid= " + slaCalc.getId() + ". Exception is "
807                    + e.getClass().getName() + ": " + e.getMessage());
808            if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
809                slaCalc.setEventStatus(EventStatus.END_MISS);
810                slaCalc.setSLAStatus(SLAStatus.MISS);
811                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
812                slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4);
813            }
814        }
815    }
816
817}