001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.lib.service.instrumentation;
020
021import org.apache.hadoop.classification.InterfaceAudience;
022import org.apache.hadoop.lib.server.BaseService;
023import org.apache.hadoop.lib.server.ServiceException;
024import org.apache.hadoop.lib.service.Instrumentation;
025import org.apache.hadoop.lib.service.Scheduler;
026import org.apache.hadoop.util.Time;
027import org.json.simple.JSONAware;
028import org.json.simple.JSONObject;
029import org.json.simple.JSONStreamAware;
030
031import java.io.IOException;
032import java.io.Writer;
033import java.util.ArrayList;
034import java.util.LinkedHashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicLong;
040import java.util.concurrent.locks.Lock;
041import java.util.concurrent.locks.ReentrantLock;
042
043@InterfaceAudience.Private
044public class InstrumentationService extends BaseService implements Instrumentation {
045  public static final String PREFIX = "instrumentation";
046  public static final String CONF_TIMERS_SIZE = "timers.size";
047
048  private int timersSize;
049  private Lock counterLock;
050  private Lock timerLock;
051  private Lock variableLock;
052  private Lock samplerLock;
053  private Map<String, Map<String, AtomicLong>> counters;
054  private Map<String, Map<String, Timer>> timers;
055  private Map<String, Map<String, VariableHolder>> variables;
056  private Map<String, Map<String, Sampler>> samplers;
057  private List<Sampler> samplersList;
058  private Map<String, Map<String, ?>> all;
059
060  public InstrumentationService() {
061    super(PREFIX);
062  }
063
064  @Override
065  @SuppressWarnings("unchecked")
066  public void init() throws ServiceException {
067    timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10);
068    counterLock = new ReentrantLock();
069    timerLock = new ReentrantLock();
070    variableLock = new ReentrantLock();
071    samplerLock = new ReentrantLock();
072    Map<String, VariableHolder> jvmVariables = new ConcurrentHashMap<String, VariableHolder>();
073    counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>();
074    timers = new ConcurrentHashMap<String, Map<String, Timer>>();
075    variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>();
076    samplers = new ConcurrentHashMap<String, Map<String, Sampler>>();
077    samplersList = new ArrayList<Sampler>();
078    all = new LinkedHashMap<String, Map<String, ?>>();
079    all.put("os-env", System.getenv());
080    all.put("sys-props", (Map<String, ?>) (Map) System.getProperties());
081    all.put("jvm", jvmVariables);
082    all.put("counters", (Map) counters);
083    all.put("timers", (Map) timers);
084    all.put("variables", (Map) variables);
085    all.put("samplers", (Map) samplers);
086
087    jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
088      @Override
089      public Long getValue() {
090        return Runtime.getRuntime().freeMemory();
091      }
092    }));
093    jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
094      @Override
095      public Long getValue() {
096        return Runtime.getRuntime().maxMemory();
097      }
098    }));
099    jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
100      @Override
101      public Long getValue() {
102        return Runtime.getRuntime().totalMemory();
103      }
104    }));
105  }
106
107  @Override
108  public void postInit() throws ServiceException {
109    Scheduler scheduler = getServer().get(Scheduler.class);
110    if (scheduler != null) {
111      scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS);
112    }
113  }
114
115  @Override
116  public Class getInterface() {
117    return Instrumentation.class;
118  }
119
120  @SuppressWarnings("unchecked")
121  private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) {
122    boolean locked = false;
123    try {
124      Map<String, T> groupMap = map.get(group);
125      if (groupMap == null) {
126        lock.lock();
127        locked = true;
128        groupMap = map.get(group);
129        if (groupMap == null) {
130          groupMap = new ConcurrentHashMap<String, T>();
131          map.put(group, groupMap);
132        }
133      }
134      T element = groupMap.get(name);
135      if (element == null) {
136        if (!locked) {
137          lock.lock();
138          locked = true;
139        }
140        element = groupMap.get(name);
141        if (element == null) {
142          try {
143            if (klass == Timer.class) {
144              element = (T) new Timer(timersSize);
145            } else {
146              element = klass.newInstance();
147            }
148          } catch (Exception ex) {
149            throw new RuntimeException(ex);
150          }
151          groupMap.put(name, element);
152        }
153      }
154      return element;
155    } finally {
156      if (locked) {
157        lock.unlock();
158      }
159    }
160  }
161
162  static class Cron implements Instrumentation.Cron {
163    long start;
164    long lapStart;
165    long own;
166    long total;
167
168    @Override
169    public Cron start() {
170      if (total != 0) {
171        throw new IllegalStateException("Cron already used");
172      }
173      if (start == 0) {
174        start = Time.now();
175        lapStart = start;
176      } else if (lapStart == 0) {
177        lapStart = Time.now();
178      }
179      return this;
180    }
181
182    @Override
183    public Cron stop() {
184      if (total != 0) {
185        throw new IllegalStateException("Cron already used");
186      }
187      if (lapStart > 0) {
188        own += Time.now() - lapStart;
189        lapStart = 0;
190      }
191      return this;
192    }
193
194    void end() {
195      stop();
196      total = Time.now() - start;
197    }
198
199  }
200
201  static class Timer implements JSONAware, JSONStreamAware {
202    static final int LAST_TOTAL = 0;
203    static final int LAST_OWN = 1;
204    static final int AVG_TOTAL = 2;
205    static final int AVG_OWN = 3;
206
207    Lock lock = new ReentrantLock();
208    private long[] own;
209    private long[] total;
210    private int last;
211    private boolean full;
212    private int size;
213
214    public Timer(int size) {
215      this.size = size;
216      own = new long[size];
217      total = new long[size];
218      for (int i = 0; i < size; i++) {
219        own[i] = -1;
220        total[i] = -1;
221      }
222      last = -1;
223    }
224
225    long[] getValues() {
226      lock.lock();
227      try {
228        long[] values = new long[4];
229        values[LAST_TOTAL] = total[last];
230        values[LAST_OWN] = own[last];
231        int limit = (full) ? size : (last + 1);
232        for (int i = 0; i < limit; i++) {
233          values[AVG_TOTAL] += total[i];
234          values[AVG_OWN] += own[i];
235        }
236        values[AVG_TOTAL] = values[AVG_TOTAL] / limit;
237        values[AVG_OWN] = values[AVG_OWN] / limit;
238        return values;
239      } finally {
240        lock.unlock();
241      }
242    }
243
244    void addCron(Cron cron) {
245      cron.end();
246      lock.lock();
247      try {
248        last = (last + 1) % size;
249        full = full || last == (size - 1);
250        total[last] = cron.total;
251        own[last] = cron.own;
252      } finally {
253        lock.unlock();
254      }
255    }
256
257    @SuppressWarnings("unchecked")
258    private JSONObject getJSON() {
259      long[] values = getValues();
260      JSONObject json = new JSONObject();
261      json.put("lastTotal", values[0]);
262      json.put("lastOwn", values[1]);
263      json.put("avgTotal", values[2]);
264      json.put("avgOwn", values[3]);
265      return json;
266    }
267
268    @Override
269    public String toJSONString() {
270      return getJSON().toJSONString();
271    }
272
273    @Override
274    public void writeJSONString(Writer out) throws IOException {
275      getJSON().writeJSONString(out);
276    }
277
278  }
279
280  @Override
281  public Cron createCron() {
282    return new Cron();
283  }
284
285  @Override
286  public void incr(String group, String name, long count) {
287    AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters);
288    counter.addAndGet(count);
289  }
290
291  @Override
292  public void addCron(String group, String name, Instrumentation.Cron cron) {
293    Timer timer = getToAdd(group, name, Timer.class, timerLock, timers);
294    timer.addCron((Cron) cron);
295  }
296
297  static class VariableHolder<E> implements JSONAware, JSONStreamAware {
298    Variable<E> var;
299
300    public VariableHolder() {
301    }
302
303    public VariableHolder(Variable<E> var) {
304      this.var = var;
305    }
306
307    @SuppressWarnings("unchecked")
308    private JSONObject getJSON() {
309      JSONObject json = new JSONObject();
310      json.put("value", var.getValue());
311      return json;
312    }
313
314    @Override
315    public String toJSONString() {
316      return getJSON().toJSONString();
317    }
318
319    @Override
320    public void writeJSONString(Writer out) throws IOException {
321      out.write(toJSONString());
322    }
323
324  }
325
326  @Override
327  public void addVariable(String group, String name, Variable<?> variable) {
328    VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables);
329    holder.var = variable;
330  }
331
332  static class Sampler implements JSONAware, JSONStreamAware {
333    Variable<Long> variable;
334    long[] values;
335    private AtomicLong sum;
336    private int last;
337    private boolean full;
338
339    void init(int size, Variable<Long> variable) {
340      this.variable = variable;
341      values = new long[size];
342      sum = new AtomicLong();
343      last = 0;
344    }
345
346    void sample() {
347      int index = last;
348      long valueGoingOut = values[last];
349      full = full || last == (values.length - 1);
350      last = (last + 1) % values.length;
351      values[index] = variable.getValue();
352      sum.addAndGet(-valueGoingOut + values[index]);
353    }
354
355    double getRate() {
356      return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last));
357    }
358
359    @SuppressWarnings("unchecked")
360    private JSONObject getJSON() {
361      JSONObject json = new JSONObject();
362      json.put("sampler", getRate());
363      json.put("size", (full) ? values.length : last);
364      return json;
365    }
366
367    @Override
368    public String toJSONString() {
369      return getJSON().toJSONString();
370    }
371
372    @Override
373    public void writeJSONString(Writer out) throws IOException {
374      out.write(toJSONString());
375    }
376  }
377
378  @Override
379  public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) {
380    Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers);
381    samplerLock.lock();
382    try {
383      sampler.init(samplingSize, variable);
384      samplersList.add(sampler);
385    } finally {
386      samplerLock.unlock();
387    }
388  }
389
390  class SamplersRunnable implements Runnable {
391
392    @Override
393    public void run() {
394      samplerLock.lock();
395      try {
396        for (Sampler sampler : samplersList) {
397          sampler.sample();
398        }
399      } finally {
400        samplerLock.unlock();
401      }
402    }
403  }
404
405  @Override
406  public Map<String, Map<String, ?>> getSnapshot() {
407    return all;
408  }
409
410
411}