1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io;
19
20 import java.nio.ByteBuffer;
21 import java.util.Queue;
22 import java.util.concurrent.ArrayBlockingQueue;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28
29 import com.google.common.annotations.VisibleForTesting;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class BoundedByteBufferPool {
49 private final Log LOG = LogFactory.getLog(this.getClass());
50
51 @VisibleForTesting
52 final Queue<ByteBuffer> buffers;
53
54
55 private final int maxByteBufferSizeToCache;
56
57
58 private volatile int runningAverage;
59
60
61 private volatile int totalReservoirCapacity;
62
63
64 private AtomicLong allocations = new AtomicLong(0);
65
66
67
68
69
70
71 public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
72 final int maxToCache) {
73 this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
74 this.runningAverage = initialByteBufferSize;
75 this.buffers = new ArrayBlockingQueue<ByteBuffer>(maxToCache, true);
76 }
77
78 public ByteBuffer getBuffer() {
79 ByteBuffer bb = this.buffers.poll();
80 if (bb != null) {
81
82 bb.clear();
83 this.totalReservoirCapacity -= bb.capacity();
84 } else {
85 bb = ByteBuffer.allocate(this.runningAverage);
86 this.allocations.incrementAndGet();
87 }
88 if (LOG.isTraceEnabled()) {
89 LOG.trace("runningAverage=" + this.runningAverage +
90 ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size() +
91 ", allocations=" + this.allocations.get());
92 }
93 return bb;
94 }
95
96 public void putBuffer(ByteBuffer bb) {
97
98 if (bb.capacity() > this.maxByteBufferSizeToCache) return;
99 if (!this.buffers.offer(bb)) {
100 LOG.warn("At capacity: " + this.buffers.size());
101 } else {
102 int size = this.buffers.size();
103 this.totalReservoirCapacity += bb.capacity();
104 int average = 0;
105 if (size != 0) {
106 average = this.totalReservoirCapacity / size;
107 }
108 if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) {
109 this.runningAverage = average;
110 }
111 }
112 }
113 }