001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.event.BundleJobEvent;
024import org.apache.oozie.event.CoordinatorActionEvent;
025import org.apache.oozie.event.CoordinatorJobEvent;
026import org.apache.oozie.client.event.Event;
027import org.apache.oozie.client.event.Event.MessageType;
028import org.apache.oozie.client.event.JobEvent;
029import org.apache.oozie.event.EventQueue;
030import org.apache.oozie.event.MemoryEventQueue;
031import org.apache.oozie.event.WorkflowActionEvent;
032import org.apache.oozie.event.WorkflowJobEvent;
033import org.apache.oozie.event.listener.JobEventListener;
034import org.apache.oozie.sla.listener.SLAEventListener;
035import org.apache.oozie.client.event.SLAEvent;
036import org.apache.oozie.util.XLog;
037
038import java.util.ArrayList;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.Iterator;
042import java.util.List;
043import java.util.Map;
044import java.util.Set;
045
046/**
047 * Service class that handles the events system - creating events queue,
048 * managing configured properties and managing and invoking various event
049 * listeners via worker threads
050 */
051public class EventHandlerService implements Service {
052
053    public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService.";
054    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
055    public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue";
056    public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners";
057    public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types";
058    public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size";
059    public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads";
060    public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval";
061
062    private static EventQueue eventQueue;
063    private XLog LOG;
064    private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>();
065    private Set<String> apptypes;
066    private static boolean eventsEnabled = false;
067    private int numWorkers;
068
069    @Override
070    public void init(Services services) throws ServiceException {
071        try {
072            Configuration conf = services.getConf();
073            LOG = XLog.getLog(getClass());
074            Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null);
075            eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
076            eventQueue.init(conf);
077            // initialize app-types to switch on events for
078            initApptypes(conf);
079            // initialize event listeners
080            initEventListeners(conf);
081            // initialize worker threads via Scheduler
082            initWorkerThreads(conf, services);
083            eventsEnabled = true;
084            LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}],"
085                    + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass()
086                    .getName(), listenerMap.toString(), apptypes, numWorkers);
087        }
088        catch (Exception ex) {
089            throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex);
090        }
091    }
092
093    private void initApptypes(Configuration conf) {
094        apptypes = new HashSet<String>();
095        for (String jobtype : conf.getStringCollection(CONF_FILTER_APP_TYPES)) {
096            String tmp = jobtype.trim().toLowerCase();
097            if (tmp.length() == 0) {
098                continue;
099            }
100            apptypes.add(tmp);
101        }
102    }
103
104    private void initEventListeners(Configuration conf) throws Exception {
105        Class<?>[] listenerClass = conf.getClasses(CONF_LISTENERS,
106                org.apache.oozie.jms.JMSJobEventListener.class,
107                org.apache.oozie.sla.listener.SLAJobEventListener.class);
108        for (int i = 0; i < listenerClass.length; i++) {
109            Object listener = null;
110            try {
111                listener = listenerClass[i].newInstance();
112            }
113            catch (InstantiationException e) {
114                LOG.warn("Could not create event listener instance, " + e);
115            }
116            catch (IllegalAccessException e) {
117                LOG.warn("Illegal access to event listener instance, " + e);
118            }
119            addEventListener(listener, conf, listenerClass[i].getName());
120        }
121    }
122
123    @SuppressWarnings({ "rawtypes", "unchecked" })
124    public void addEventListener(Object listener, Configuration conf, String name) throws Exception {
125        if (listener instanceof JobEventListener) {
126            List listenersList = listenerMap.get(MessageType.JOB);
127            if (listenersList == null) {
128                listenersList = new ArrayList();
129                listenerMap.put(MessageType.JOB, listenersList);
130            }
131            listenersList.add(listener);
132            ((JobEventListener) listener).init(conf);
133        }
134        else if (listener instanceof SLAEventListener) {
135            List listenersList = listenerMap.get(MessageType.SLA);
136            if (listenersList == null) {
137                listenersList = new ArrayList();
138                listenerMap.put(MessageType.SLA, listenersList);
139            }
140            listenersList.add(listener);
141            ((SLAEventListener) listener).init(conf);
142        }
143        else {
144            LOG.warn("Event listener [{0}] is of undefined type", name);
145        }
146    }
147
148    public static boolean isEnabled() {
149        return eventsEnabled;
150    }
151
152    private void initWorkerThreads(Configuration conf, Services services) throws ServiceException {
153        numWorkers = conf.getInt(CONF_WORKER_THREADS, 3);
154        int interval = conf.getInt(CONF_WORKER_INTERVAL, 30);
155        SchedulerService ss = services.get(SchedulerService.class);
156        int available = ss.getSchedulableThreads(conf);
157        if (numWorkers + 3 > available) {
158            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested ["
159                    + numWorkers + "] cannot be handled with current settings. Increase "
160                    + SchedulerService.SCHEDULER_THREADS);
161        }
162        Runnable eventWorker = new EventWorker();
163        // schedule staggered runnables every 1 min interval by default
164        for (int i = 0; i < numWorkers; i++) {
165            ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC);
166        }
167    }
168
169    @Override
170    public void destroy() {
171        eventsEnabled = false;
172        for (MessageType type : listenerMap.keySet()) {
173            Iterator<?> iter = listenerMap.get(type).iterator();
174            while (iter.hasNext()) {
175                if (type == MessageType.JOB) {
176                    ((JobEventListener) iter.next()).destroy();
177                }
178                else if (type == MessageType.SLA) {
179                    ((SLAEventListener) iter.next()).destroy();
180                }
181            }
182        }
183    }
184
185    @Override
186    public Class<? extends Service> getInterface() {
187        return EventHandlerService.class;
188    }
189
190    public boolean isSupportedApptype(String appType) {
191        if (!apptypes.contains(appType.toLowerCase())) {
192            return false;
193        }
194        return true;
195    }
196
197    public void setAppTypes(Set<String> types) {
198        apptypes = types;
199    }
200
201    public Set<String> getAppTypes() {
202        return apptypes;
203    }
204
205    public String listEventListeners() {
206        return listenerMap.toString();
207    }
208
209    public void queueEvent(Event event) {
210        LOG.debug("Queueing event : {0}", event);
211        LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
212        eventQueue.add(event);
213    }
214
215    public EventQueue getEventQueue() {
216        return eventQueue;
217    }
218
219    public class EventWorker implements Runnable {
220
221        @Override
222        public void run() {
223            if (Thread.currentThread().isInterrupted()) {
224                return;
225            }
226            try {
227                if (!eventQueue.isEmpty()) {
228                    List<Event> work = eventQueue.pollBatch();
229                    for (Event event : work) {
230                        LOG.debug("Processing event : {0}", event);
231                        MessageType msgType = event.getMsgType();
232                        List<?> listeners = listenerMap.get(msgType);
233                        if (listeners != null) {
234                            Iterator<?> iter = listeners.iterator();
235                            while (iter.hasNext()) {
236                                try {
237                                    if (msgType == MessageType.JOB) {
238                                        invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);
239                                    }
240                                    else if (msgType == MessageType.SLA) {
241                                        invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event);
242                                    }
243                                    else {
244                                        iter.next();
245                                    }
246                                }
247                                catch (Throwable error) {
248                                    XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
249                                            error);
250                                }
251                            }
252                        }
253                    }
254                }
255            }
256            catch (Throwable error) {
257                XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
258                        error);
259            }
260        }
261
262        private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {
263            switch (event.getAppType()) {
264                case WORKFLOW_JOB:
265                    jobListener.onWorkflowJobEvent((WorkflowJobEvent)event);
266                    break;
267                case WORKFLOW_ACTION:
268                    jobListener.onWorkflowActionEvent((WorkflowActionEvent)event);
269                    break;
270                case COORDINATOR_JOB:
271                    jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event);
272                    break;
273                case COORDINATOR_ACTION:
274                    jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event);
275                    break;
276                case BUNDLE_JOB:
277                    jobListener.onBundleJobEvent((BundleJobEvent)event);
278                    break;
279                default:
280                    XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}",
281                            event.getAppType());
282            }
283        }
284
285        private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) {
286            switch (event.getEventStatus()) {
287                case START_MET:
288                    slaListener.onStartMet(event);
289                    break;
290                case START_MISS:
291                    slaListener.onStartMiss(event);
292                    break;
293                case END_MET:
294                    slaListener.onEndMet(event);
295                    break;
296                case END_MISS:
297                    slaListener.onEndMiss(event);
298                    break;
299                case DURATION_MET:
300                    slaListener.onDurationMet(event);
301                    break;
302                case DURATION_MISS:
303                    slaListener.onDurationMiss(event);
304                    break;
305                default:
306                    XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus());
307            }
308        }
309    }
310
311}