001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.LinkedHashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.Map.Entry;
030import java.util.concurrent.BlockingQueue;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.RejectedExecutionException;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
039import org.apache.oozie.util.Instrumentable;
040import org.apache.oozie.util.Instrumentation;
041import org.apache.oozie.util.PollablePriorityDelayQueue;
042import org.apache.oozie.util.PriorityDelayQueue;
043import org.apache.oozie.util.XCallable;
044import org.apache.oozie.util.XLog;
045import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
046
047/**
048 * The callable queue service queues {@link XCallable}s for asynchronous execution.
049 * <p/>
050 * Callables can be queued for immediate execution or for delayed execution (some time in the future).
051 * <p/>
052 * Callables are consumed from the queue for execution based on their priority.
053 * <p/>
054 * When the queues (for immediate execution and for delayed execution) are full, the callable queue service stops
055 * queuing callables.
056 * <p/>
057 * A thread-pool is used to execute the callables asynchronously.
058 * <p/>
059 * The following configuration parameters control the callable queue service:
060 * <p/>
061 * {@link #CONF_QUEUE_SIZE} size of the immediate execution queue. Defaulf value is 10000.
062 * <p/>
063 * {@link #CONF_THREADS} number of threads in the thread-pool used for asynchronous command execution. When this number
064 * of threads is reached, commands remain the queue until threads become available. Sets up a priority queue for the
065 * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution
066 * sometime in the future.
067 */
068public class CallableQueueService implements Service, Instrumentable {
069    private static final String INSTRUMENTATION_GROUP = "callablequeue";
070    private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue";
071    private static final String INSTR_EXECUTED_COUNTER = "executed";
072    private static final String INSTR_FAILED_COUNTER = "failed";
073    private static final String INSTR_QUEUED_COUNTER = "queued";
074    private static final String INSTR_QUEUE_SIZE_SAMPLER = "queue.size";
075    private static final String INSTR_THREADS_ACTIVE_SAMPLER = "threads.active";
076
077    public static final String CONF_PREFIX = Service.CONF_PREFIX + "CallableQueueService.";
078
079    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
080    public static final String CONF_THREADS = CONF_PREFIX + "threads";
081    public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency";
082    public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible";
083    public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes";
084    public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize";
085
086    public static final int CONCURRENCY_DELAY = 500;
087
088    public static final int SAFE_MODE_DELAY = 60000;
089
090    private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
091
092    private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
093
094    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
095
096    public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>();
097
098    private int interruptMapMaxSize;
099
100    private int maxCallableConcurrency;
101
102    private boolean callableBegin(XCallable<?> callable) {
103        synchronized (activeCallables) {
104            AtomicInteger counter = activeCallables.get(callable.getType());
105            if (counter == null) {
106                counter = new AtomicInteger(1);
107                activeCallables.put(callable.getType(), counter);
108                return true;
109            }
110            else {
111                int i = counter.incrementAndGet();
112                return i <= maxCallableConcurrency;
113            }
114        }
115    }
116
117    private void callableEnd(XCallable<?> callable) {
118        synchronized (activeCallables) {
119            AtomicInteger counter = activeCallables.get(callable.getType());
120            if (counter == null) {
121                throw new IllegalStateException("It should not happen");
122            }
123            else {
124                counter.decrementAndGet();
125            }
126        }
127    }
128
129    private boolean callableReachMaxConcurrency(XCallable<?> callable) {
130        synchronized (activeCallables) {
131            AtomicInteger counter = activeCallables.get(callable.getType());
132            if (counter == null) {
133                return true;
134            }
135            else {
136                int i = counter.get();
137                return i < maxCallableConcurrency;
138            }
139        }
140    }
141
142    // Callables are wrapped with the this wrapper for execution, for logging
143    // and instrumentation.
144    // The wrapper implements Runnable and Comparable to be able to work with an
145    // executor and a priority queue.
146    class CallableWrapper extends PriorityDelayQueue.QueueElement<XCallable<?>> implements Runnable {
147        private Instrumentation.Cron cron;
148
149        public CallableWrapper(XCallable<?> callable, long delay) {
150            super(callable, callable.getPriority(), delay, TimeUnit.MILLISECONDS);
151            cron = new Instrumentation.Cron();
152            cron.start();
153        }
154
155        public void run() {
156            if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
157                log.info("Oozie is in SAFEMODE, requeuing callable [{0}] with [{1}]ms delay", getElement().getType(),
158                        SAFE_MODE_DELAY);
159                setDelay(SAFE_MODE_DELAY, TimeUnit.MILLISECONDS);
160                removeFromUniqueCallables();
161                queue(this, true);
162                return;
163            }
164            XCallable<?> callable = getElement();
165            try {
166                if (callableBegin(callable)) {
167                    cron.stop();
168                    addInQueueCron(cron);
169                    XLog.Info.get().clear();
170                    XLog log = XLog.getLog(getClass());
171                    log.trace("executing callable [{0}]", callable.getName());
172
173                    removeFromUniqueCallables();
174                    try {
175                        callable.call();
176                        incrCounter(INSTR_EXECUTED_COUNTER, 1);
177                        log.trace("executed callable [{0}]", callable.getName());
178                    }
179                    catch (Exception ex) {
180                        incrCounter(INSTR_FAILED_COUNTER, 1);
181                        log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
182                    }
183                    finally {
184                        XLog.Info.get().clear();
185                    }
186                }
187                else {
188                    log.warn("max concurrency for callable [{0}] exceeded, requeueing with [{1}]ms delay", callable
189                            .getType(), CONCURRENCY_DELAY);
190                    setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
191                    removeFromUniqueCallables();
192                    queue(this, true);
193                    incrCounter(callable.getType() + "#exceeded.concurrency", 1);
194                }
195            }
196            finally {
197                callableEnd(callable);
198            }
199        }
200
201        /**
202         * @return String the queue dump
203         */
204        @Override
205        public String toString() {
206            return "delay=" + getDelay(TimeUnit.MILLISECONDS) + ", elements=" + getElement().toString();
207        }
208
209        /**
210         * Filter the duplicate callables from the list before queue this.
211         * <p/>
212         * If it is single callable, checking if key is in unique map or not.
213         * <p/>
214         * If it is composite callable, remove duplicates callables from the composite.
215         *
216         * @return true if this callable should be queued
217         */
218        public boolean filterDuplicates() {
219            XCallable<?> callable = getElement();
220            if (callable instanceof CompositeCallable) {
221                return ((CompositeCallable) callable).removeDuplicates();
222            }
223            else {
224                return uniqueCallables.containsKey(callable.getKey()) == false;
225            }
226        }
227
228        /**
229         * Add the keys to the set
230         */
231        public void addToUniqueCallables() {
232            XCallable<?> callable = getElement();
233            if (callable instanceof CompositeCallable) {
234                ((CompositeCallable) callable).addToUniqueCallables();
235            }
236            else {
237                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
238            }
239        }
240
241        /**
242         * Remove the keys from the set
243         */
244        public void removeFromUniqueCallables() {
245            XCallable<?> callable = getElement();
246            if (callable instanceof CompositeCallable) {
247                ((CompositeCallable) callable).removeFromUniqueCallables();
248            }
249            else {
250                uniqueCallables.remove(callable.getKey());
251            }
252        }
253    }
254
255    class CompositeCallable implements XCallable<Void> {
256        private List<XCallable<?>> callables;
257        private String name;
258        private int priority;
259        private long createdTime;
260
261        public CompositeCallable(List<? extends XCallable<?>> callables) {
262            this.callables = new ArrayList<XCallable<?>>(callables);
263            priority = 0;
264            createdTime = Long.MAX_VALUE;
265            StringBuilder sb = new StringBuilder();
266            String separator = "[";
267            for (XCallable<?> callable : callables) {
268                priority = Math.max(priority, callable.getPriority());
269                createdTime = Math.min(createdTime, callable.getCreatedTime());
270                sb.append(separator).append(callable.getName());
271                separator = ",";
272            }
273            sb.append("]");
274            name = sb.toString();
275        }
276
277        @Override
278        public String getName() {
279            return name;
280        }
281
282        @Override
283        public String getType() {
284            return "#composite#" + callables.get(0).getType();
285        }
286
287        @Override
288        public String getKey() {
289            return "#composite#" + callables.get(0).getKey();
290        }
291
292        @Override
293        public String getEntityKey() {
294            return "#composite#" + callables.get(0).getEntityKey();
295        }
296
297        @Override
298        public int getPriority() {
299            return priority;
300        }
301
302        @Override
303        public long getCreatedTime() {
304            return createdTime;
305        }
306
307        @Override
308        public void setInterruptMode(boolean mode) {
309        }
310
311        @Override
312        public boolean inInterruptMode() {
313            return false;
314        }
315
316        public List<XCallable<?>> getCallables() {
317            return this.callables;
318        }
319
320        public Void call() throws Exception {
321            XLog log = XLog.getLog(getClass());
322
323            for (XCallable<?> callable : callables) {
324                log.trace("executing callable [{0}]", callable.getName());
325                try {
326                    callable.call();
327                    incrCounter(INSTR_EXECUTED_COUNTER, 1);
328                    log.trace("executed callable [{0}]", callable.getName());
329                }
330                catch (Exception ex) {
331                    incrCounter(INSTR_FAILED_COUNTER, 1);
332                    log.warn("exception callable [{0}], {1}", callable.getName(), ex.getMessage(), ex);
333                }
334            }
335
336            // ticking -1 not to count the call to the composite callable
337            incrCounter(INSTR_EXECUTED_COUNTER, -1);
338            return null;
339        }
340
341        /*
342         * (non-Javadoc)
343         *
344         * @see java.lang.Object#toString()
345         */
346        @Override
347        public String toString() {
348            if (callables.size() == 0) {
349                return null;
350            }
351            StringBuilder sb = new StringBuilder();
352            int size = callables.size();
353            for (int i = 0; i < size; i++) {
354                XCallable<?> callable = callables.get(i);
355                sb.append("(");
356                sb.append(callable.toString());
357                if (i + 1 == size) {
358                    sb.append(")");
359                }
360                else {
361                    sb.append("),");
362                }
363            }
364            return sb.toString();
365        }
366
367        /**
368         * Remove the duplicate callables from the list before queue them
369         *
370         * @return true if callables should be queued
371         */
372        public boolean removeDuplicates() {
373            Set<String> set = new HashSet<String>();
374            List<XCallable<?>> filteredCallables = new ArrayList<XCallable<?>>();
375            if (callables.size() == 0) {
376                return false;
377            }
378            for (XCallable<?> callable : callables) {
379                if (!uniqueCallables.containsKey(callable.getKey()) && !set.contains(callable.getKey())) {
380                    filteredCallables.add(callable);
381                    set.add(callable.getKey());
382                }
383            }
384            callables = filteredCallables;
385            if (callables.size() == 0) {
386                return false;
387            }
388            return true;
389        }
390
391        /**
392         * Add the keys to the set
393         */
394        public void addToUniqueCallables() {
395            for (XCallable<?> callable : callables) {
396                ((ConcurrentHashMap<String, Date>) uniqueCallables).putIfAbsent(callable.getKey(), new Date());
397            }
398        }
399
400        /**
401         * Remove the keys from the set
402         */
403        public void removeFromUniqueCallables() {
404            for (XCallable<?> callable : callables) {
405                uniqueCallables.remove(callable.getKey());
406            }
407        }
408    }
409
410    private XLog log = XLog.getLog(getClass());
411
412    private int queueSize;
413    private PriorityDelayQueue<CallableWrapper> queue;
414    private ThreadPoolExecutor executor;
415    private Instrumentation instrumentation;
416
417    /**
418     * Convenience method for instrumentation counters.
419     *
420     * @param name counter name.
421     * @param count count to increment the counter.
422     */
423    private void incrCounter(String name, int count) {
424        if (instrumentation != null) {
425            instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
426        }
427    }
428
429    private void addInQueueCron(Instrumentation.Cron cron) {
430        if (instrumentation != null) {
431            instrumentation.addCron(INSTRUMENTATION_GROUP, INSTR_IN_QUEUE_TIME_TIMER, cron);
432        }
433    }
434
435    /**
436     * Initialize the command queue service.
437     *
438     * @param services services instance.
439     */
440    @Override
441    @SuppressWarnings({ "unchecked", "rawtypes" })
442    public void init(Services services) {
443        Configuration conf = services.getConf();
444
445        queueSize = conf.getInt(CONF_QUEUE_SIZE, 10000);
446        int threads = conf.getInt(CONF_THREADS, 10);
447        boolean callableNextEligible = conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true);
448
449        for (String type : conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) {
450            log.debug("Adding interrupt type [{0}]", type);
451            INTERRUPT_TYPES.add(type);
452        }
453
454        if (!callableNextEligible) {
455            queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
456                @Override
457                protected void debug(String msgTemplate, Object... msgArgs) {
458                    log.trace(msgTemplate, msgArgs);
459                }
460            };
461        }
462        else {
463            // If the head of this queue has already reached max concurrency,
464            // continuously find next one
465            // which has not yet reach max concurrency.Overrided method
466            // 'eligibleToPoll' to check if the
467            // element of this queue has reached the maximum concurrency.
468            queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
469                @Override
470                protected void debug(String msgTemplate, Object... msgArgs) {
471                    log.trace(msgTemplate, msgArgs);
472                }
473
474                @Override
475                protected boolean eligibleToPoll(QueueElement<?> element) {
476                    if (element != null) {
477                        CallableWrapper wrapper = (CallableWrapper) element;
478                        if (element.getElement() != null) {
479                            return callableReachMaxConcurrency(wrapper.getElement());
480                        }
481                    }
482                    return false;
483                }
484
485            };
486        }
487
488        interruptMapMaxSize = conf.getInt(CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, 100);
489
490        // IMPORTANT: The ThreadPoolExecutor does not always the execute
491        // commands out of the queue, there are
492        // certain conditions where commands are pushed directly to a thread.
493        // As we are using a queue with DELAYED semantics (i.e. execute the
494        // command in 5 mins) we need to make
495        // sure that the commands are always pushed to the queue.
496        // To achieve this (by looking a the ThreadPoolExecutor.execute()
497        // implementation, we are making the pool
498        // minimum size equals to the maximum size (thus threads are keep always
499        // running) and we are warming up
500        // all those threads (the for loop that runs dummy runnables).
501        executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue);
502
503        for (int i = 0; i < threads; i++) {
504            executor.execute(new Runnable() {
505                public void run() {
506                    try {
507                        Thread.sleep(100);
508                    }
509                    catch (InterruptedException ex) {
510                        log.warn("Could not warm up threadpool {0}", ex.getMessage(), ex);
511                    }
512                }
513            });
514        }
515
516        maxCallableConcurrency = conf.getInt(CONF_CALLABLE_CONCURRENCY, 3);
517    }
518
519    /**
520     * Destroy the command queue service.
521     */
522    @Override
523    public void destroy() {
524        try {
525            long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
526            executor.shutdown();
527            queue.clear();
528            while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
529                log.info("Waiting for executor to shutdown");
530                if (System.currentTimeMillis() > limit) {
531                    log.warn("Gave up, continuing without waiting for executor to shutdown");
532                    break;
533                }
534            }
535        }
536        catch (InterruptedException ex) {
537            log.warn(ex);
538        }
539    }
540
541    /**
542     * Return the public interface for command queue service.
543     *
544     * @return {@link CallableQueueService}.
545     */
546    @Override
547    public Class<? extends Service> getInterface() {
548        return CallableQueueService.class;
549    }
550
551    /**
552     * @return int size of queue
553     */
554    public synchronized int queueSize() {
555        return queue.size();
556    }
557
558    private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) {
559        if (!ignoreQueueSize && queue.size() >= queueSize) {
560            log.warn("queue if full, ignoring queuing for [{0}]", wrapper.getElement());
561            return false;
562        }
563        if (!executor.isShutdown()) {
564            if (wrapper.filterDuplicates()) {
565                wrapper.addToUniqueCallables();
566                try {
567                    executor.execute(wrapper);
568                }
569                catch (RejectedExecutionException ree) {
570                    wrapper.removeFromUniqueCallables();
571                    throw ree;
572                }
573            }
574        }
575        else {
576            log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement());
577        }
578        return true;
579    }
580
581    /**
582     * Queue a callable for asynchronous execution.
583     *
584     * @param callable callable to queue.
585     * @return <code>true</code> if the callable was queued, <code>false</code> if the queue is full and the callable
586     *         was not queued.
587     */
588    public boolean queue(XCallable<?> callable) {
589        return queue(callable, 0);
590    }
591
592    /**
593     * Queue a list of callables for serial execution.
594     * <p/>
595     * Useful to serialize callables that may compete with each other for resources.
596     * <p/>
597     * All callables will be processed with the priority of the highest priority of all callables.
598     *
599     * @param callables callables to be executed by the composite callable.
600     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
601     *         were not queued.
602     */
603    public boolean queueSerial(List<? extends XCallable<?>> callables) {
604        return queueSerial(callables, 0);
605    }
606
607    /**
608     * Queue a callable for asynchronous execution sometime in the future.
609     *
610     * @param callable callable to queue for delayed execution
611     * @param delay time, in milliseconds, that the callable should be delayed.
612     * @return <code>true</code> if the callable was queued, <code>false</code>
613     *         if the queue is full and the callable was not queued.
614     */
615    public synchronized boolean queue(XCallable<?> callable, long delay) {
616        if (callable == null) {
617            return true;
618        }
619        boolean queued = false;
620        if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
621            log.warn("[queue] System is in SAFEMODE. Hence no callable is queued. current queue size " + queue.size());
622        }
623        else {
624            checkInterruptTypes(callable);
625            queued = queue(new CallableWrapper(callable, delay), false);
626            if (queued) {
627                incrCounter(INSTR_QUEUED_COUNTER, 1);
628            }
629            else {
630                log.warn("Could not queue callable");
631            }
632        }
633        return queued;
634    }
635
636    /**
637     * Queue a list of callables for serial execution sometime in the future.
638     * <p/>
639     * Useful to serialize callables that may compete with each other for resources.
640     * <p/>
641     * All callables will be processed with the priority of the highest priority of all callables.
642     *
643     * @param callables callables to be executed by the composite callable.
644     * @param delay time, in milliseconds, that the callable should be delayed.
645     * @return <code>true</code> if the callables were queued, <code>false</code> if the queue is full and the callables
646     *         were not queued.
647     */
648    public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay) {
649        boolean queued;
650        if (callables == null || callables.size() == 0) {
651            queued = true;
652        }
653        else if (callables.size() == 1) {
654            queued = queue(callables.get(0), delay);
655        }
656        else {
657            XCallable<?> callable = new CompositeCallable(callables);
658            queued = queue(callable, delay);
659            if (queued) {
660                incrCounter(INSTR_QUEUED_COUNTER, callables.size());
661            }
662        }
663        return queued;
664    }
665
666    /**
667     * Instruments the callable queue service.
668     *
669     * @param instr instance to instrument the callable queue service to.
670     */
671    public void instrument(Instrumentation instr) {
672        instrumentation = instr;
673        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_QUEUE_SIZE_SAMPLER, 60, 1, new Instrumentation.Variable<Long>() {
674            public Long getValue() {
675                return (long) queue.size();
676            }
677        });
678        instr.addSampler(INSTRUMENTATION_GROUP, INSTR_THREADS_ACTIVE_SAMPLER, 60, 1,
679                new Instrumentation.Variable<Long>() {
680                    public Long getValue() {
681                        return (long) executor.getActiveCount();
682                    }
683                });
684    }
685
686    /**
687     * check the interrupt map for the existence of an interrupt commands if
688     * exist a List of Interrupt Callable for the same lock key will bereturned,
689     * otherwise it will return null
690     */
691    public Set<XCallable<?>> checkInterrupts(String lockKey) {
692
693        if (lockKey != null) {
694            return interruptCommandsMap.remove(lockKey);
695        }
696        return null;
697    }
698
699    /**
700     * check if the callable is of an interrupt type and insert it into the map
701     * accordingly
702     *
703     * @param callable
704     */
705    public void checkInterruptTypes(XCallable<?> callable) {
706        if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
707            for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
708                if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
709                    insertCallableIntoInterruptMap(singleCallable);
710                }
711            }
712        }
713        else if (INTERRUPT_TYPES.contains(callable.getType())) {
714            insertCallableIntoInterruptMap(callable);
715        }
716    }
717
718    /**
719     * insert a new callable in the Interrupt Command Map add a new element to
720     * the list or create a new list accordingly
721     *
722     * @param callable
723     */
724    public void insertCallableIntoInterruptMap(XCallable<?> callable) {
725        if (interruptCommandsMap.size() < interruptMapMaxSize) {
726            Set<XCallable<?>> newSet = Collections.synchronizedSet(new LinkedHashSet<XCallable<?>>());
727            Set<XCallable<?>> interruptSet = interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
728            if (interruptSet == null) {
729                interruptSet = newSet;
730            }
731            if (interruptSet.add(callable)) {
732                log.trace("Inserting an interrupt element [{0}] to the interrupt map", callable.toString());
733            } else {
734                log.trace("Interrupt element [{0}] already present", callable.toString());
735            }
736        }
737        else {
738            log.warn(
739                    "The interrupt map reached max size of [{0}], an interrupt element [{1}] will not added to the map [{1}]",
740                    interruptCommandsMap.size(), callable.toString());
741        }
742    }
743
744    /**
745     * Get the list of strings of queue dump
746     *
747     * @return the list of string that representing each CallableWrapper
748     */
749    public List<String> getQueueDump() {
750        List<String> list = new ArrayList<String>();
751        for (QueueElement<CallableWrapper> qe : queue) {
752            if (qe.toString() == null) {
753                continue;
754            }
755            list.add(qe.toString());
756        }
757        return list;
758    }
759
760    /**
761     * Get the list of strings of uniqueness map dump
762     *
763     * @return the list of string that representing the key of each command in the queue
764     */
765    public List<String> getUniqueDump() {
766        List<String> list = new ArrayList<String>();
767        for (Entry<String, Date> entry : uniqueCallables.entrySet()) {
768            list.add(entry.toString());
769        }
770        return list;
771    }
772
773}