001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.event;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.LinkedBlockingQueue;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.apache.hadoop.classification.InterfaceAudience.Public;
031import org.apache.hadoop.classification.InterfaceStability.Evolving;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.service.AbstractService;
034import org.apache.hadoop.util.ShutdownHookManager;
035import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
036
037/**
038 * Dispatches {@link Event}s in a separate thread. Currently only single thread
039 * does that. Potentially there could be multiple channels for each event type
040 * class and a thread pool can be used to dispatch the events.
041 */
042@SuppressWarnings("rawtypes")
043@Public
044@Evolving
045public class AsyncDispatcher extends AbstractService implements Dispatcher {
046
047  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
048
049  private final BlockingQueue<Event> eventQueue;
050  private volatile boolean stopped = false;
051
052  // Configuration flag for enabling/disabling draining dispatcher's events on
053  // stop functionality.
054  private volatile boolean drainEventsOnStop = false;
055
056  // Indicates all the remaining dispatcher's events on stop have been drained
057  // and processed.
058  private volatile boolean drained = true;
059  private Object waitForDrained = new Object();
060
061  // For drainEventsOnStop enabled only, block newly coming events into the
062  // queue while stopping.
063  private volatile boolean blockNewEvents = false;
064  private EventHandler handlerInstance = null;
065
066  private Thread eventHandlingThread;
067  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
068  private boolean exitOnDispatchException;
069
070  public AsyncDispatcher() {
071    this(new LinkedBlockingQueue<Event>());
072  }
073
074  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
075    super("Dispatcher");
076    this.eventQueue = eventQueue;
077    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
078  }
079
080  Runnable createThread() {
081    return new Runnable() {
082      @Override
083      public void run() {
084        while (!stopped && !Thread.currentThread().isInterrupted()) {
085          drained = eventQueue.isEmpty();
086          // blockNewEvents is only set when dispatcher is draining to stop,
087          // adding this check is to avoid the overhead of acquiring the lock
088          // and calling notify every time in the normal run of the loop.
089          if (blockNewEvents) {
090            synchronized (waitForDrained) {
091              if (drained) {
092                waitForDrained.notify();
093              }
094            }
095          }
096          Event event;
097          try {
098            event = eventQueue.take();
099          } catch(InterruptedException ie) {
100            if (!stopped) {
101              LOG.warn("AsyncDispatcher thread interrupted", ie);
102            }
103            return;
104          }
105          if (event != null) {
106            dispatch(event);
107          }
108        }
109      }
110    };
111  }
112
113  @Override
114  protected void serviceInit(Configuration conf) throws Exception {
115    this.exitOnDispatchException =
116        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
117          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
118    super.serviceInit(conf);
119  }
120
121  @Override
122  protected void serviceStart() throws Exception {
123    //start all the components
124    super.serviceStart();
125    eventHandlingThread = new Thread(createThread());
126    eventHandlingThread.setName("AsyncDispatcher event handler");
127    eventHandlingThread.start();
128  }
129
130  public void setDrainEventsOnStop() {
131    drainEventsOnStop = true;
132  }
133
134  @Override
135  protected void serviceStop() throws Exception {
136    if (drainEventsOnStop) {
137      blockNewEvents = true;
138      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
139      synchronized (waitForDrained) {
140        while (!drained && eventHandlingThread.isAlive()) {
141          waitForDrained.wait(1000);
142          LOG.info("Waiting for AsyncDispatcher to drain.");
143        }
144      }
145    }
146    stopped = true;
147    if (eventHandlingThread != null) {
148      eventHandlingThread.interrupt();
149      try {
150        eventHandlingThread.join();
151      } catch (InterruptedException ie) {
152        LOG.warn("Interrupted Exception while stopping", ie);
153      }
154    }
155
156    // stop all the components
157    super.serviceStop();
158  }
159
160  @SuppressWarnings("unchecked")
161  protected void dispatch(Event event) {
162    //all events go thru this loop
163    if (LOG.isDebugEnabled()) {
164      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
165          + event.toString());
166    }
167
168    Class<? extends Enum> type = event.getType().getDeclaringClass();
169
170    try{
171      EventHandler handler = eventDispatchers.get(type);
172      if(handler != null) {
173        handler.handle(event);
174      } else {
175        throw new Exception("No handler for registered for " + type);
176      }
177    } catch (Throwable t) {
178      //TODO Maybe log the state of the queue
179      LOG.fatal("Error in dispatcher thread", t);
180      // If serviceStop is called, we should exit this thread gracefully.
181      if (exitOnDispatchException
182          && (ShutdownHookManager.get().isShutdownInProgress()) == false
183          && stopped == false) {
184        LOG.info("Exiting, bbye..");
185        System.exit(-1);
186      }
187    }
188  }
189
190  @SuppressWarnings("unchecked")
191  @Override
192  public void register(Class<? extends Enum> eventType,
193      EventHandler handler) {
194    /* check to see if we have a listener registered */
195    EventHandler<Event> registeredHandler = (EventHandler<Event>)
196    eventDispatchers.get(eventType);
197    LOG.info("Registering " + eventType + " for " + handler.getClass());
198    if (registeredHandler == null) {
199      eventDispatchers.put(eventType, handler);
200    } else if (!(registeredHandler instanceof MultiListenerHandler)){
201      /* for multiple listeners of an event add the multiple listener handler */
202      MultiListenerHandler multiHandler = new MultiListenerHandler();
203      multiHandler.addHandler(registeredHandler);
204      multiHandler.addHandler(handler);
205      eventDispatchers.put(eventType, multiHandler);
206    } else {
207      /* already a multilistener, just add to it */
208      MultiListenerHandler multiHandler
209      = (MultiListenerHandler) registeredHandler;
210      multiHandler.addHandler(handler);
211    }
212  }
213
214  @Override
215  public EventHandler getEventHandler() {
216    if (handlerInstance == null) {
217      handlerInstance = new GenericEventHandler();
218    }
219    return handlerInstance;
220  }
221
222  class GenericEventHandler implements EventHandler<Event> {
223    public void handle(Event event) {
224      if (blockNewEvents) {
225        return;
226      }
227      drained = false;
228
229      /* all this method does is enqueue all the events onto the queue */
230      int qSize = eventQueue.size();
231      if (qSize !=0 && qSize %1000 == 0) {
232        LOG.info("Size of event-queue is " + qSize);
233      }
234      int remCapacity = eventQueue.remainingCapacity();
235      if (remCapacity < 1000) {
236        LOG.warn("Very low remaining capacity in the event-queue: "
237            + remCapacity);
238      }
239      try {
240        eventQueue.put(event);
241      } catch (InterruptedException e) {
242        if (!stopped) {
243          LOG.warn("AsyncDispatcher thread interrupted", e);
244        }
245        throw new YarnRuntimeException(e);
246      }
247    };
248  }
249
250  /**
251   * Multiplexing an event. Sending it to different handlers that
252   * are interested in the event.
253   * @param <T> the type of event these multiple handlers are interested in.
254   */
255  static class MultiListenerHandler implements EventHandler<Event> {
256    List<EventHandler<Event>> listofHandlers;
257
258    public MultiListenerHandler() {
259      listofHandlers = new ArrayList<EventHandler<Event>>();
260    }
261
262    @Override
263    public void handle(Event event) {
264      for (EventHandler<Event> handler: listofHandlers) {
265        handler.handle(event);
266      }
267    }
268
269    void addHandler(EventHandler<Event> handler) {
270      listofHandlers.add(handler);
271    }
272
273  }
274}