001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.util;
020
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience.Public;
028import org.apache.hadoop.classification.InterfaceStability.Evolving;
029import org.apache.hadoop.service.AbstractService;
030
031/**
032 * A simple liveliness monitor with which clients can register, trust the
033 * component to monitor liveliness, get a call-back on expiry and then finally
034 * unregister.
035 */
036@Public
037@Evolving
038public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
039
040  private static final Log LOG = LogFactory.getLog(AbstractLivelinessMonitor.class);
041
042  //thread which runs periodically to see the last time since a heartbeat is
043  //received.
044  private Thread checkerThread;
045  private volatile boolean stopped;
046  public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins
047  private int expireInterval = DEFAULT_EXPIRE;
048  private int monitorInterval = expireInterval/3;
049
050  private final Clock clock;
051
052  private Map<O, Long> running = new HashMap<O, Long>();
053
054  public AbstractLivelinessMonitor(String name, Clock clock) {
055    super(name);
056    this.clock = clock;
057  }
058
059  @Override
060  protected void serviceStart() throws Exception {
061    assert !stopped : "starting when already stopped";
062    checkerThread = new Thread(new PingChecker());
063    checkerThread.setName("Ping Checker");
064    checkerThread.start();
065    super.serviceStart();
066  }
067
068  @Override
069  protected void serviceStop() throws Exception {
070    stopped = true;
071    if (checkerThread != null) {
072      checkerThread.interrupt();
073    }
074    super.serviceStop();
075  }
076
077  protected abstract void expire(O ob);
078
079  protected void setExpireInterval(int expireInterval) {
080    this.expireInterval = expireInterval;
081  }
082
083  protected void setMonitorInterval(int monitorInterval) {
084    this.monitorInterval = monitorInterval;
085  }
086
087  public synchronized void receivedPing(O ob) {
088    //only put for the registered objects
089    if (running.containsKey(ob)) {
090      running.put(ob, clock.getTime());
091    }
092  }
093
094  public synchronized void register(O ob) {
095    running.put(ob, clock.getTime());
096  }
097
098  public synchronized void unregister(O ob) {
099    running.remove(ob);
100  }
101
102  private class PingChecker implements Runnable {
103
104    @Override
105    public void run() {
106      while (!stopped && !Thread.currentThread().isInterrupted()) {
107        synchronized (AbstractLivelinessMonitor.this) {
108          Iterator<Map.Entry<O, Long>> iterator = 
109            running.entrySet().iterator();
110
111          //avoid calculating current time everytime in loop
112          long currentTime = clock.getTime();
113
114          while (iterator.hasNext()) {
115            Map.Entry<O, Long> entry = iterator.next();
116            if (currentTime > entry.getValue() + expireInterval) {
117              iterator.remove();
118              expire(entry.getKey());
119              LOG.info("Expired:" + entry.getKey().toString() + 
120                      " Timed out after " + expireInterval/1000 + " secs");
121            }
122          }
123        }
124        try {
125          Thread.sleep(monitorInterval);
126        } catch (InterruptedException e) {
127          LOG.info(getName() + " thread interrupted");
128          break;
129        }
130      }
131    }
132  }
133
134}