View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.CopyOnWriteArrayList;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  
36  /**
37   *
38   * The <code>PoolMap</code> maps a key to a collection of values, the elements
39   * of which are managed by a pool. In effect, that collection acts as a shared
40   * pool of resources, access to which is closely controlled as per the semantics
41   * of the pool.
42   *
43   * <p>
44   * In case the size of the pool is set to a non-zero positive number, that is
45   * used to cap the number of resources that a pool may contain for any given
46   * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
47   * </p>
48   *
49   * @param <K>
50   *          the type of the key to the resource
51   * @param <V>
52   *          the type of the resource being pooled
53   */
54  @InterfaceAudience.Private
55  public class PoolMap<K, V> implements Map<K, V> {
56    private PoolType poolType;
57  
58    private int poolMaxSize;
59  
60    private Map<K, Pool<V>> pools = new ConcurrentHashMap<K, Pool<V>>();
61  
62    public PoolMap(PoolType poolType) {
63      this.poolType = poolType;
64    }
65  
66    public PoolMap(PoolType poolType, int poolMaxSize) {
67      this.poolType = poolType;
68      this.poolMaxSize = poolMaxSize;
69    }
70  
71    @Override
72    public V get(Object key) {
73      Pool<V> pool = pools.get(key);
74      return pool != null ? pool.get() : null;
75    }
76  
77    @Override
78    public V put(K key, V value) {
79      Pool<V> pool = pools.get(key);
80      if (pool == null) {
81        pools.put(key, pool = createPool());
82      }
83      return pool != null ? pool.put(value) : null;
84    }
85  
86    @SuppressWarnings("unchecked")
87    @Override
88    public V remove(Object key) {
89      Pool<V> pool = pools.remove(key);
90      if (pool != null) {
91        removeValue((K) key, pool.get());
92      }
93      return null;
94    }
95  
96    /**
97     * @deprecated Will be removed for Java 8, use {@link #removeValue} instead
98     */
99    @Deprecated
100   public boolean remove(K key, V value) {
101     return removeValue(key, value);
102   }
103 
104   public boolean removeValue(K key, V value) {
105     Pool<V> pool = pools.get(key);
106     boolean res = false;
107     if (pool != null) {
108       res = pool.remove(value);
109       if (res && pool.size() == 0) {
110         pools.remove(key);
111       }
112     }
113     return res;
114   }
115 
116   @Override
117   public Collection<V> values() {
118     Collection<V> values = new ArrayList<V>();
119     for (Pool<V> pool : pools.values()) {
120       Collection<V> poolValues = pool.values();
121       if (poolValues != null) {
122         values.addAll(poolValues);
123       }
124     }
125     return values;
126   }
127 
128   public Collection<V> values(K key) {
129     Collection<V> values = new ArrayList<V>();
130     Pool<V> pool = pools.get(key);
131     if (pool != null) {
132       Collection<V> poolValues = pool.values();
133       if (poolValues != null) {
134         values.addAll(poolValues);
135       }
136     }
137     return values;
138   }
139 
140 
141   @Override
142   public boolean isEmpty() {
143     return pools.isEmpty();
144   }
145 
146   @Override
147   public int size() {
148     return pools.size();
149   }
150 
151   public int size(K key) {
152     Pool<V> pool = pools.get(key);
153     return pool != null ? pool.size() : 0;
154   }
155 
156   @Override
157   public boolean containsKey(Object key) {
158     return pools.containsKey(key);
159   }
160 
161   @Override
162   public boolean containsValue(Object value) {
163     if (value == null) {
164       return false;
165     }
166     for (Pool<V> pool : pools.values()) {
167       if (value.equals(pool.get())) {
168         return true;
169       }
170     }
171     return false;
172   }
173 
174   @Override
175   public void putAll(Map<? extends K, ? extends V> map) {
176     for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
177       put(entry.getKey(), entry.getValue());
178     }
179   }
180 
181   @Override
182   public void clear() {
183     for (Pool<V> pool : pools.values()) {
184       pool.clear();
185     }
186     pools.clear();
187   }
188 
189   @Override
190   public Set<K> keySet() {
191     return pools.keySet();
192   }
193 
194   @Override
195   public Set<Map.Entry<K, V>> entrySet() {
196     Set<Map.Entry<K, V>> entries = new HashSet<Entry<K, V>>();
197     for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
198       final K poolKey = poolEntry.getKey();
199       final Pool<V> pool = poolEntry.getValue();
200       if (pool != null) {
201         for (final V poolValue : pool.values()) {
202           entries.add(new Map.Entry<K, V>() {
203             @Override
204             public K getKey() {
205               return poolKey;
206             }
207 
208             @Override
209             public V getValue() {
210               return poolValue;
211             }
212 
213             @Override
214             public V setValue(V value) {
215               return pool.put(value);
216             }
217           });
218         }
219       }
220     }
221     return null;
222   }
223 
224   protected interface Pool<R> {
225     R get();
226 
227     R put(R resource);
228 
229     boolean remove(R resource);
230 
231     void clear();
232 
233     Collection<R> values();
234 
235     int size();
236   }
237 
238   public enum PoolType {
239     Reusable, ThreadLocal, RoundRobin;
240 
241     public static PoolType valueOf(String poolTypeName,
242         PoolType defaultPoolType, PoolType... allowedPoolTypes) {
243       PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
244       if (poolType != null) {
245         boolean allowedType = false;
246         if (poolType.equals(defaultPoolType)) {
247           allowedType = true;
248         } else {
249           if (allowedPoolTypes != null) {
250             for (PoolType allowedPoolType : allowedPoolTypes) {
251               if (poolType.equals(allowedPoolType)) {
252                 allowedType = true;
253                 break;
254               }
255             }
256           }
257         }
258         if (!allowedType) {
259           poolType = null;
260         }
261       }
262       return (poolType != null) ? poolType : defaultPoolType;
263     }
264 
265     public static String fuzzyNormalize(String name) {
266       return name != null ? name.replaceAll("-", "").trim().toLowerCase() : "";
267     }
268 
269     public static PoolType fuzzyMatch(String name) {
270       for (PoolType poolType : values()) {
271         if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
272           return poolType;
273         }
274       }
275       return null;
276     }
277   }
278 
279   protected Pool<V> createPool() {
280     switch (poolType) {
281     case Reusable:
282       return new ReusablePool<V>(poolMaxSize);
283     case RoundRobin:
284       return new RoundRobinPool<V>(poolMaxSize);
285     case ThreadLocal:
286       return new ThreadLocalPool<V>();
287     }
288     return null;
289   }
290 
291   /**
292    * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds
293    * on the {@link LinkedList} class. It essentially allows resources to be
294    * checked out, at which point it is removed from this pool. When the resource
295    * is no longer required, it should be returned to the pool in order to be
296    * reused.
297    *
298    * <p>
299    * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
300    * the pool is unbounded. Otherwise, it caps the number of consumers that can
301    * check out a resource from this pool to the (non-zero positive) value
302    * specified in {@link #maxSize}.
303    * </p>
304    *
305    * @param <R>
306    *          the type of the resource
307    */
308   @SuppressWarnings("serial")
309   public class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> {
310     private int maxSize;
311 
312     public ReusablePool(int maxSize) {
313       this.maxSize = maxSize;
314 
315     }
316 
317     @Override
318     public R get() {
319       return poll();
320     }
321 
322     @Override
323     public R put(R resource) {
324       if (super.size() < maxSize) {
325         add(resource);
326       }
327       return null;
328     }
329 
330     @Override
331     public Collection<R> values() {
332       return this;
333     }
334   }
335 
336   /**
337    * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
338    * stores its resources in an {@link ArrayList}. It load-balances access to
339    * its resources by returning a different resource every time a given key is
340    * looked up.
341    *
342    * <p>
343    * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
344    * the pool is unbounded. Otherwise, it caps the number of resources in this
345    * pool to the (non-zero positive) value specified in {@link #maxSize}.
346    * </p>
347    *
348    * @param <R>
349    *          the type of the resource
350    *
351    */
352   @SuppressWarnings("serial")
353   class RoundRobinPool<R> extends CopyOnWriteArrayList<R> implements Pool<R> {
354     private int maxSize;
355     private int nextResource = 0;
356 
357     public RoundRobinPool(int maxSize) {
358       this.maxSize = maxSize;
359     }
360 
361     @Override
362     public R put(R resource) {
363       if (super.size() < maxSize) {
364         add(resource);
365       }
366       return null;
367     }
368 
369     @Override
370     public R get() {
371       if (super.size() < maxSize) {
372         return null;
373       }
374       nextResource %= super.size();
375       R resource = get(nextResource++);
376       return resource;
377     }
378 
379     @Override
380     public Collection<R> values() {
381       return this;
382     }
383 
384   }
385 
386   /**
387    * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
388    * builds on the {@link ThreadLocal} class. It essentially binds the resource
389    * to the thread from which it is accessed.
390    *
391    * <p>
392    * Note that the size of the pool is essentially bounded by the number of threads
393    * that add resources to this pool.
394    * </p>
395    *
396    * @param <R>
397    *          the type of the resource
398    */
399   static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
400     private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>();
401 
402     public ThreadLocalPool() {
403     }
404 
405     @Override
406     public R put(R resource) {
407       R previousResource = get();
408       if (previousResource == null) {
409         AtomicInteger poolSize = poolSizes.get(this);
410         if (poolSize == null) {
411           poolSizes.put(this, poolSize = new AtomicInteger(0));
412         }
413         poolSize.incrementAndGet();
414       }
415       this.set(resource);
416       return previousResource;
417     }
418 
419     @Override
420     public void remove() {
421       super.remove();
422       AtomicInteger poolSize = poolSizes.get(this);
423       if (poolSize != null) {
424         poolSize.decrementAndGet();
425       }
426     }
427 
428     @Override
429     public int size() {
430       AtomicInteger poolSize = poolSizes.get(this);
431       return poolSize != null ? poolSize.get() : 0;
432     }
433 
434     @Override
435     public boolean remove(R resource) {
436       R previousResource = super.get();
437       if (resource != null && resource.equals(previousResource)) {
438         remove();
439         return true;
440       } else {
441         return false;
442       }
443     }
444 
445     @Override
446     public void clear() {
447       super.remove();
448     }
449 
450     @Override
451     public Collection<R> values() {
452       List<R> values = new ArrayList<R>();
453       values.add(get());
454       return values;
455     }
456   }
457 }