View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.io;
19  
20  import java.nio.ByteBuffer;
21  import java.util.Queue;
22  import java.util.concurrent.ArrayBlockingQueue;
23  import java.util.concurrent.atomic.AtomicLong;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  
29  import com.google.common.annotations.VisibleForTesting;
30  
31  /**
32   * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer.
33   * This pool keeps an upper bound on the count of ByteBuffers in the pool and on the maximum size
34   * of ByteBuffer that it will retain (Hence the pool is 'bounded' as opposed to, say,
35   * Hadoop's ElasticByteBuffferPool).
36   * If a ByteBuffer is bigger than the configured threshold, we will just let the ByteBuffer go
37   * rather than add it to the pool. If more ByteBuffers than the configured maximum instances,
38   * we will not add the passed ByteBuffer to the pool; we will just drop it
39   * (we will log a WARN in this case that we are at capacity).
40   *
41   * <p>The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to
42   * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this
43   * class for a couple of seconds to get reporting on how it is running when deployed.
44   *
45   * <p>This class is thread safe.
46   */
47  @InterfaceAudience.Private
48  public class BoundedByteBufferPool {
49    private final Log LOG = LogFactory.getLog(this.getClass());
50  
51    @VisibleForTesting
52    final Queue<ByteBuffer> buffers;
53  
54    // Maximum size of a ByteBuffer to retain in pool
55    private final int maxByteBufferSizeToCache;
56  
57    // A running average only it only rises, it never recedes
58    private volatile int runningAverage;
59  
60    // Scratch that keeps rough total size of pooled bytebuffers
61    private volatile int totalReservoirCapacity;
62  
63    // For reporting
64    private AtomicLong allocations = new AtomicLong(0);
65  
66    /**
67     * @param maxByteBufferSizeToCache
68     * @param initialByteBufferSize
69     * @param maxToCache
70     */
71    public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
72        final int maxToCache) {
73      this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
74      this.runningAverage = initialByteBufferSize;
75      this.buffers = new ArrayBlockingQueue<ByteBuffer>(maxToCache, true);
76    }
77  
78    public ByteBuffer getBuffer() {
79      ByteBuffer bb = this.buffers.poll();
80      if (bb != null) {
81        // Clear sets limit == capacity.  Postion == 0.
82        bb.clear();
83        this.totalReservoirCapacity -= bb.capacity();
84      } else {
85        bb = ByteBuffer.allocate(this.runningAverage);
86        this.allocations.incrementAndGet();
87      }
88      if (LOG.isTraceEnabled()) {
89        LOG.trace("runningAverage=" + this.runningAverage +
90          ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size() +
91          ", allocations=" + this.allocations.get());
92      }
93      return bb;
94    }
95  
96    public void putBuffer(ByteBuffer bb) {
97      // If buffer is larger than we want to keep around, just let it go.
98      if (bb.capacity() > this.maxByteBufferSizeToCache) return;
99      if (!this.buffers.offer(bb)) {
100       LOG.warn("At capacity: " + this.buffers.size());
101     } else {
102       int size = this.buffers.size(); // This size may be inexact.
103       this.totalReservoirCapacity += bb.capacity();
104       int average = 0;
105       if (size != 0) {
106         average = this.totalReservoirCapacity / size;
107       }
108       if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) {
109         this.runningAverage = average;
110       }
111     }
112   }
113 }