001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.oozie.service.ConfigurationService;
022
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.LinkedHashMap;
028import java.util.LinkedHashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicLong;
036import java.util.concurrent.locks.Lock;
037import java.util.concurrent.locks.ReentrantLock;
038
039/**
040 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p/> All
041 * instrumentation elements have a group and a name.
042 */
043public class Instrumentation {
044    private ScheduledExecutorService scheduler;
045    private Lock counterLock;
046    private Lock timerLock;
047    private Lock variableLock;
048    private Lock samplerLock;
049    private Configuration configuration;
050    private Map<String, Map<String, Map<String, Object>>> all;
051    private Map<String, Map<String, Element<Long>>> counters;
052    private Map<String, Map<String, Element<Timer>>> timers;
053    private Map<String, Map<String, Element<Variable>>> variables;
054    private Map<String, Map<String, Element<Double>>> samplers;
055
056    /**
057     * Instrumentation constructor.
058     */
059    @SuppressWarnings("unchecked")
060    public Instrumentation() {
061        counterLock = new ReentrantLock();
062        timerLock = new ReentrantLock();
063        variableLock = new ReentrantLock();
064        samplerLock = new ReentrantLock();
065        all = new LinkedHashMap<String, Map<String, Map<String, Object>>>();
066        counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>();
067        timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>();
068        variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>();
069        samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>();
070        all.put("variables", (Map<String, Map<String, Object>>) (Object) variables);
071        all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers);
072        all.put("counters", (Map<String, Map<String, Object>>) (Object) counters);
073        all.put("timers", (Map<String, Map<String, Object>>) (Object) timers);
074    }
075
076    /**
077     * Set the scheduler instance to handle the samplers.
078     *
079     * @param scheduler scheduler instance.
080     */
081    public void setScheduler(ScheduledExecutorService scheduler) {
082        this.scheduler = scheduler;
083    }
084
085    /**
086     * Cron is a stopwatch that can be started/stopped several times. <p/> This class is not thread safe, it does not
087     * need to be. <p/> It keeps track of the total time (first start to last stop) and the running time (total time
088     * minus the stopped intervals). <p/> Once a Cron is complete it must be added to the corresponding group/name in a
089     * Instrumentation instance.
090     */
091    public static class Cron {
092        private long start;
093        private long end;
094        private long lapStart;
095        private long own;
096        private long total;
097        private boolean running;
098
099        /**
100         * Creates new Cron, stopped, in zero.
101         */
102        public Cron() {
103            running = false;
104        }
105
106        /**
107         * Start the cron. It cannot be already started.
108         */
109        public void start() {
110            if (!running) {
111                if (lapStart == 0) {
112                    lapStart = System.currentTimeMillis();
113                    if (start == 0) {
114                        start = lapStart;
115                        end = start;
116                    }
117                }
118                running = true;
119            }
120        }
121
122        /**
123         * Stops the cron. It cannot be already stopped.
124         */
125        public void stop() {
126            if (running) {
127                end = System.currentTimeMillis();
128                if (start == 0) {
129                    start = end;
130                }
131                total = end - start;
132                if (lapStart > 0) {
133                    own += end - lapStart;
134                    lapStart = 0;
135                }
136                running = false;
137            }
138        }
139
140        /**
141         * Return the start time of the cron. It must be stopped.
142         *
143         * @return the start time of the cron.
144         */
145        public long getStart() {
146            if (running) {
147                throw new IllegalStateException("Timer running");
148            }
149            return start;
150        }
151
152        /**
153         * Return the end time of the cron.  It must be stopped.
154         *
155         * @return the end time of the cron.
156         */
157        public long getEnd() {
158            if (running) {
159                throw new IllegalStateException("Timer running");
160            }
161            return end;
162        }
163
164        /**
165         * Return the total time of the cron. It must be stopped.
166         *
167         * @return the total time of the cron.
168         */
169        public long getTotal() {
170            if (running) {
171                throw new IllegalStateException("Timer running");
172            }
173            return total;
174        }
175
176        /**
177         * Return the own time of the cron. It must be stopped.
178         *
179         * @return the own time of the cron.
180         */
181        public long getOwn() {
182            if (running) {
183                throw new IllegalStateException("Timer running");
184            }
185            return own;
186        }
187
188    }
189
190    /**
191     * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p/> Instrumentation element snapshots
192     * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods.
193     */
194    public interface Element<T> {
195
196        /**
197         * Return the snapshot value of the Intrumentation element.
198         *
199         * @return the snapshot value of the Intrumentation element.
200         */
201        T getValue();
202    }
203
204    /**
205     * Counter Instrumentation element.
206     */
207    private static class Counter extends AtomicLong implements Element<Long> {
208
209        /**
210         * Return the counter snapshot.
211         *
212         * @return the counter snapshot.
213         */
214        public Long getValue() {
215            return get();
216        }
217
218        /**
219         * Return the String representation of the counter value.
220         *
221         * @return the String representation of the counter value.
222         */
223        public String toString() {
224            return Long.toString(get());
225        }
226
227    }
228
229    /**
230     * Timer Instrumentation element.
231     */
232    public static class Timer implements Element<Timer> {
233        Lock lock = new ReentrantLock();
234        private long ownTime;
235        private long totalTime;
236        private long ticks;
237        private long ownSquareTime;
238        private long totalSquareTime;
239        private long ownMinTime;
240        private long ownMaxTime;
241        private long totalMinTime;
242        private long totalMaxTime;
243
244        /**
245         * Timer constructor. <p/> It is project private for test purposes.
246         */
247        Timer() {
248        }
249
250        /**
251         * Return the String representation of the timer value.
252         *
253         * @return the String representation of the timer value.
254         */
255        public String toString() {
256            return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg());
257        }
258
259        /**
260         * Return the timer snapshot.
261         *
262         * @return the timer snapshot.
263         */
264        public Timer getValue() {
265            try {
266                lock.lock();
267                Timer timer = new Timer();
268                timer.ownTime = ownTime;
269                timer.totalTime = totalTime;
270                timer.ticks = ticks;
271                timer.ownSquareTime = ownSquareTime;
272                timer.totalSquareTime = totalSquareTime;
273                timer.ownMinTime = ownMinTime;
274                timer.ownMaxTime = ownMaxTime;
275                timer.totalMinTime = totalMinTime;
276                timer.totalMaxTime = totalMaxTime;
277                return timer;
278            }
279            finally {
280                lock.unlock();
281            }
282        }
283
284        /**
285         * Add a cron to a timer. <p/> It is project private for test purposes.
286         *
287         * @param cron Cron to add.
288         */
289        void addCron(Cron cron) {
290            try {
291                lock.lock();
292                long own = cron.getOwn();
293                long total = cron.getTotal();
294                ownTime += own;
295                totalTime += total;
296                ticks++;
297                ownSquareTime += own * own;
298                totalSquareTime += total * total;
299                if (ticks == 1) {
300                    ownMinTime = own;
301                    ownMaxTime = own;
302                    totalMinTime = total;
303                    totalMaxTime = total;
304                }
305                else {
306                    ownMinTime = Math.min(ownMinTime, own);
307                    ownMaxTime = Math.max(ownMaxTime, own);
308                    totalMinTime = Math.min(totalMinTime, total);
309                    totalMaxTime = Math.max(totalMaxTime, total);
310                }
311            }
312            finally {
313                lock.unlock();
314            }
315        }
316
317        /**
318         * Return the own accumulated computing time by the timer.
319         *
320         * @return own accumulated computing time by the timer.
321         */
322        public long getOwn() {
323            return ownTime;
324        }
325
326        /**
327         * Return the total accumulated computing time by the timer.
328         *
329         * @return total accumulated computing time by the timer.
330         */
331        public long getTotal() {
332            return totalTime;
333        }
334
335        /**
336         * Return the number of times a cron was added to the timer.
337         *
338         * @return the number of times a cron was added to the timer.
339         */
340        public long getTicks() {
341            return ticks;
342        }
343
344        /**
345         * Return the sum of the square own times. <p/> It can be used to calculate the standard deviation.
346         *
347         * @return the sum of the square own timer.
348         */
349        public long getOwnSquareSum() {
350            return ownSquareTime;
351        }
352
353        /**
354         * Return the sum of the square total times. <p/> It can be used to calculate the standard deviation.
355         *
356         * @return the sum of the square own timer.
357         */
358        public long getTotalSquareSum() {
359            return totalSquareTime;
360        }
361
362        /**
363         * Returns the own minimum time.
364         *
365         * @return the own minimum time.
366         */
367        public long getOwnMin() {
368            return ownMinTime;
369        }
370
371        /**
372         * Returns the own maximum time.
373         *
374         * @return the own maximum time.
375         */
376        public long getOwnMax() {
377            return ownMaxTime;
378        }
379
380        /**
381         * Returns the total minimum time.
382         *
383         * @return the total minimum time.
384         */
385        public long getTotalMin() {
386            return totalMinTime;
387        }
388
389        /**
390         * Returns the total maximum time.
391         *
392         * @return the total maximum time.
393         */
394        public long getTotalMax() {
395            return totalMaxTime;
396        }
397
398        /**
399         * Returns the own average time.
400         *
401         * @return the own average time.
402         */
403        public long getOwnAvg() {
404            return (ticks != 0) ? ownTime / ticks : 0;
405        }
406
407        /**
408         * Returns the total average time.
409         *
410         * @return the total average time.
411         */
412        public long getTotalAvg() {
413            return (ticks != 0) ? totalTime / ticks : 0;
414        }
415
416        /**
417         * Returns the total time standard deviation.
418         *
419         * @return the total time standard deviation.
420         */
421        public double getTotalStdDev() {
422            return evalStdDev(ticks, totalTime, totalSquareTime);
423        }
424
425        /**
426         * Returns the own time standard deviation.
427         *
428         * @return the own time standard deviation.
429         */
430        public double getOwnStdDev() {
431            return evalStdDev(ticks, ownTime, ownSquareTime);
432        }
433
434        private double evalStdDev(long n, long sn, long ssn) {
435            return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1)));
436        }
437
438    }
439
440    /**
441     * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/> This method is thread
442     * safe.
443     *
444     * @param group timer group.
445     * @param name timer name.
446     * @param cron cron to add to the timer.
447     */
448    public void addCron(String group, String name, Cron cron) {
449        Map<String, Element<Timer>> map = timers.get(group);
450        if (map == null) {
451            try {
452                timerLock.lock();
453                map = timers.get(group);
454                if (map == null) {
455                    map = new HashMap<String, Element<Timer>>();
456                    timers.put(group, map);
457                }
458            }
459            finally {
460                timerLock.unlock();
461            }
462        }
463        Timer timer = (Timer) map.get(name);
464        if (timer == null) {
465            try {
466                timerLock.lock();
467                timer = (Timer) map.get(name);
468                if (timer == null) {
469                    timer = new Timer();
470                    map.put(name, timer);
471                }
472            }
473            finally {
474                timerLock.unlock();
475            }
476        }
477        timer.addCron(cron);
478    }
479
480    /**
481     * Increment an instrumentation counter. The counter is created if it does not exists. <p/> This method is thread
482     * safe.
483     *
484     * @param group counter group.
485     * @param name counter name.
486     * @param count increment to add to the counter.
487     */
488    public void incr(String group, String name, long count) {
489        Map<String, Element<Long>> map = counters.get(group);
490        if (map == null) {
491            try {
492                counterLock.lock();
493                map = counters.get(group);
494                if (map == null) {
495                    map = new HashMap<String, Element<Long>>();
496                    counters.put(group, map);
497                }
498            }
499            finally {
500                counterLock.unlock();
501            }
502        }
503        Counter counter = (Counter) map.get(name);
504        if (counter == null) {
505            try {
506                counterLock.lock();
507                counter = (Counter) map.get(name);
508                if (counter == null) {
509                    counter = new Counter();
510                    map.put(name, counter);
511                }
512            }
513            finally {
514                counterLock.unlock();
515            }
516        }
517        counter.addAndGet(count);
518    }
519
520    /**
521     * Interface for instrumentation variables. <p/> For example a the database service could expose the number of
522     * currently active connections.
523     */
524    public interface Variable<T> extends Element<T> {
525    }
526
527    /**
528     * Add an instrumentation variable. The variable must not exist. <p/> This method is thread safe.
529     *
530     * @param group counter group.
531     * @param name counter name.
532     * @param variable variable to add.
533     */
534    @SuppressWarnings("unchecked")
535    public void addVariable(String group, String name, Variable variable) {
536        Map<String, Element<Variable>> map = variables.get(group);
537        if (map == null) {
538            try {
539                variableLock.lock();
540                map = variables.get(group);
541                if (map == null) {
542                    map = new HashMap<String, Element<Variable>>();
543                    variables.put(group, map);
544                }
545            }
546            finally {
547                variableLock.unlock();
548            }
549        }
550        if (map.containsKey(name)) {
551            throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name));
552        }
553        map.put(name, variable);
554    }
555
556    /**
557     * Set the system configuration.
558     *
559     * @param configuration system configuration.
560     */
561    public void setConfiguration(Configuration configuration) {
562        this.configuration = configuration;
563    }
564
565    /**
566     * Return the JVM system properties.
567     *
568     * @return JVM system properties.
569     */
570    @SuppressWarnings("unchecked")
571    public Map<String, String> getJavaSystemProperties() {
572        return (Map<String, String>) (Object) System.getProperties();
573    }
574
575    /**
576     * Return the OS environment used to start Oozie.
577     *
578     * @return the OS environment used to start Oozie.
579     */
580    public Map<String, String> getOSEnv() {
581        return System.getenv();
582    }
583
584    /**
585     * Return the current system configuration as a Map<String,String>.
586     *
587     * @return the current system configuration as a Map<String,String>.
588     */
589    public Map<String, String> getConfiguration() {
590        final Configuration maskedConf = ConfigurationService.maskPasswords(configuration);
591
592        return new Map<String, String>() {
593            public int size() {
594                return maskedConf.size();
595            }
596
597            public boolean isEmpty() {
598                return maskedConf.size() == 0;
599            }
600
601            public boolean containsKey(Object o) {
602                return maskedConf.get((String) o) != null;
603            }
604
605            public boolean containsValue(Object o) {
606                throw new UnsupportedOperationException();
607            }
608
609            public String get(Object o) {
610                return maskedConf.get((String) o);
611            }
612
613            public String put(String s, String s1) {
614                throw new UnsupportedOperationException();
615            }
616
617            public String remove(Object o) {
618                throw new UnsupportedOperationException();
619            }
620
621            public void putAll(Map<? extends String, ? extends String> map) {
622                throw new UnsupportedOperationException();
623            }
624
625            public void clear() {
626                throw new UnsupportedOperationException();
627            }
628
629            public Set<String> keySet() {
630                Set<String> set = new LinkedHashSet<String>();
631                for (Entry<String, String> entry : maskedConf) {
632                    set.add(entry.getKey());
633                }
634                return set;
635            }
636
637            public Collection<String> values() {
638                Set<String> set = new LinkedHashSet<String>();
639                for (Entry<String, String> entry : maskedConf) {
640                    set.add(entry.getValue());
641                }
642                return set;
643            }
644
645            public Set<Entry<String, String>> entrySet() {
646                Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>();
647                for (Entry<String, String> entry : maskedConf) {
648                    set.add(entry);
649                }
650                return set;
651            }
652        };
653    }
654
655    /**
656     * Return all the counters. <p/> This method is thread safe. <p/> The counters are live. The counter value is a
657     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
658     *
659     * @return all counters.
660     */
661    public Map<String, Map<String, Element<Long>>> getCounters() {
662        return counters;
663    }
664
665    /**
666     * Return all the timers. <p/> This method is thread safe. <p/> The timers are live. Once a timer is obtained, all
667     * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is
668     * invoked.
669     *
670     * @return all counters.
671     */
672    public Map<String, Map<String, Element<Timer>>> getTimers() {
673        return timers;
674    }
675
676    /**
677     * Return all the variables. <p/> This method is thread safe. <p/> The variables are live. The variable value is a
678     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
679     *
680     * @return all counters.
681     */
682    public Map<String, Map<String, Element<Variable>>> getVariables() {
683        return variables;
684    }
685
686    /**
687     * Return a map containing all variables, counters and timers.
688     *
689     * @return a map containing all variables, counters and timers.
690     */
691    public Map<String, Map<String, Map<String, Object>>> getAll() {
692        return all;
693    }
694
695    /**
696     * Return the string representation of the instrumentation.
697     *
698     * @return the string representation of the instrumentation.
699     */
700    public String toString() {
701        String E = System.getProperty("line.separator");
702        StringBuilder sb = new StringBuilder(4096);
703        for (String element : all.keySet()) {
704            sb.append(element).append(':').append(E);
705            List<String> groups = new ArrayList<String>(all.get(element).keySet());
706            Collections.sort(groups);
707            for (String group : groups) {
708                sb.append("  ").append(group).append(':').append(E);
709                List<String> names = new ArrayList<String>(all.get(element).get(group).keySet());
710                Collections.sort(names);
711                for (String name : names) {
712                    sb.append("    ").append(name).append(": ").append(((Element) all.get(element).
713                            get(group).get(name)).getValue()).append(E);
714                }
715            }
716        }
717        return sb.toString();
718    }
719
720    private static class Sampler implements Element<Double>, Runnable {
721        private Lock lock = new ReentrantLock();
722        private int samplingInterval;
723        private Variable<Long> variable;
724        private long[] values;
725        private int current;
726        private long valuesSum;
727        private double rate;
728
729        public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) {
730            this.samplingInterval = samplingInterval;
731            this.variable = variable;
732            values = new long[samplingPeriod / samplingInterval];
733            valuesSum = 0;
734            current = -1;
735        }
736
737        public int getSamplingInterval() {
738            return samplingInterval;
739        }
740
741        public void run() {
742            try {
743                lock.lock();
744                long newValue = variable.getValue();
745                if (current == -1) {
746                    valuesSum = newValue;
747                    current = 0;
748                    values[current] = newValue;
749                }
750                else {
751                    current = (current + 1) % values.length;
752                    valuesSum = valuesSum - values[current] + newValue;
753                    values[current] = newValue;
754                }
755                rate = ((double) valuesSum) / values.length;
756            }
757            finally {
758                lock.unlock();
759            }
760        }
761
762        public Double getValue() {
763            return rate;
764        }
765    }
766
767    /**
768     * Add a sampling variable. <p/> This method is thread safe.
769     *
770     * @param group timer group.
771     * @param name timer name.
772     * @param period sampling period to compute rate.
773     * @param interval sampling frequency, how often the variable is probed.
774     * @param variable variable to sample.
775     */
776    public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
777        if (scheduler == null) {
778            throw new IllegalStateException("scheduler not set, cannot sample");
779        }
780        try {
781            samplerLock.lock();
782            Map<String, Element<Double>> map = samplers.get(group);
783            if (map == null) {
784                map = samplers.get(group);
785                if (map == null) {
786                    map = new HashMap<String, Element<Double>>();
787                    samplers.put(group, map);
788                }
789            }
790            if (map.containsKey(name)) {
791                throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name));
792            }
793            Sampler sampler = new Sampler(period, interval, variable);
794            map.put(name, sampler);
795            scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS);
796        }
797        finally {
798            samplerLock.unlock();
799        }
800    }
801
802    /**
803     * Return all the samplers. <p/> This method is thread safe. <p/> The samplers are live. The sampler value is a
804     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
805     *
806     * @return all counters.
807     */
808    public Map<String, Map<String, Element<Double>>> getSamplers() {
809        return samplers;
810    }
811
812}