001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.sla.service;
019
020import java.util.Date;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.client.event.JobEvent.EventStatus;
024import org.apache.oozie.executor.jpa.JPAExecutorException;
025import org.apache.oozie.service.EventHandlerService;
026import org.apache.oozie.service.SchedulerService;
027import org.apache.oozie.service.Service;
028import org.apache.oozie.service.ServiceException;
029import org.apache.oozie.service.Services;
030import org.apache.oozie.sla.SLACalculator;
031import org.apache.oozie.sla.SLACalculatorMemory;
032import org.apache.oozie.sla.SLARegistrationBean;
033import org.apache.oozie.util.XLog;
034import com.google.common.annotations.VisibleForTesting;
035
036public class SLAService implements Service {
037
038    public static final String CONF_PREFIX = "oozie.sla.service.SLAService.";
039    public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl";
040    public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
041    public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events";
042    public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after";
043    public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency";
044
045    private static SLACalculator calcImpl;
046    private static boolean slaEnabled = false;
047    private EventHandlerService eventHandler;
048    public static XLog LOG;
049    @Override
050    public void init(Services services) throws ServiceException {
051        try {
052            Configuration conf = services.getConf();
053            Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) conf.getClass(
054                    CONF_CALCULATOR_IMPL, null);
055            calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance();
056            calcImpl.init(conf);
057            eventHandler = Services.get().get(EventHandlerService.class);
058            if (eventHandler == null) {
059                throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config "
060                        + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService");
061            }
062            LOG = XLog.getLog(getClass());
063            java.util.Set<String> appTypes = eventHandler.getAppTypes();
064            appTypes.add("workflow_action");
065            eventHandler.setAppTypes(appTypes);
066
067            Runnable slaThread = new SLAWorker(calcImpl);
068            // schedule runnable by default every 30 sec
069            services.get(SchedulerService.class).schedule(slaThread, 10, 30, SchedulerService.Unit.SEC);
070            slaEnabled = true;
071            LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(),
072                    conf.get(SLAService.CONF_CAPACITY));
073        }
074        catch (Exception ex) {
075            throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex);
076        }
077    }
078
079    @Override
080    public void destroy() {
081        slaEnabled = false;
082    }
083
084    @Override
085    public Class<? extends Service> getInterface() {
086        return SLAService.class;
087    }
088
089    public static boolean isEnabled() {
090        return slaEnabled;
091    }
092
093    public SLACalculator getSLACalculator() {
094        return calcImpl;
095    }
096
097    @VisibleForTesting
098    public void runSLAWorker() {
099        new SLAWorker(calcImpl).run();
100    }
101
102    private class SLAWorker implements Runnable {
103
104        SLACalculator calc;
105
106        public SLAWorker(SLACalculator calc) {
107            this.calc = calc;
108        }
109
110        @Override
111        public void run() {
112            if (Thread.currentThread().isInterrupted()) {
113                return;
114            }
115            try {
116                calc.updateAllSlaStatus();
117            }
118            catch (Throwable error) {
119                XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error);
120            }
121        }
122    }
123
124    public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
125        try {
126            if (calcImpl.addRegistration(reg.getId(), reg)) {
127                return true;
128            }
129            else {
130                LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId());
131            }
132        }
133        catch (JPAExecutorException ex) {
134            LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex);
135        }
136        return false;
137    }
138
139    public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
140        try {
141            if (calcImpl.updateRegistration(reg.getId(), reg)) {
142                return true;
143            }
144            else {
145                LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId());
146            }
147        }
148        catch (JPAExecutorException ex) {
149            LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex);
150        }
151        return false;
152    }
153
154    public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime)
155            throws ServiceException {
156        try {
157            if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) {
158                return true;
159            }
160        }
161        catch (JPAExecutorException jpe) {
162            LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId);
163        }
164        return false;
165    }
166
167    public void removeRegistration(String jobId) {
168        calcImpl.removeRegistration(jobId);
169    }
170
171}