1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.LinkedBlockingQueue;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29
30 import com.google.common.base.Preconditions;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 @InterfaceAudience.Private
54 public class MemStoreLAB {
55 private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
56
57 private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
58
59 final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
60 final static int CHUNK_SIZE_DEFAULT = 2048 * 1024;
61 final int chunkSize;
62
63 final static String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
64 final static int MAX_ALLOC_DEFAULT = 256 * 1024;
65 final int maxAlloc;
66
67 private final MemStoreChunkPool chunkPool;
68
69
70
71 private volatile boolean closed = false;
72
73
74 private AtomicBoolean reclaimed = new AtomicBoolean(false);
75
76 private final AtomicInteger openScannerCount = new AtomicInteger();
77
78
79 public MemStoreLAB() {
80 this(new Configuration());
81 }
82
83 private MemStoreLAB(Configuration conf) {
84 this(conf, MemStoreChunkPool.getPool(conf));
85 }
86
87 public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) {
88 chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
89 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
90 this.chunkPool = pool;
91
92
93 Preconditions.checkArgument(
94 maxAlloc <= chunkSize,
95 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
96 }
97
98
99
100
101
102
103
104 public Allocation allocateBytes(int size) {
105 Preconditions.checkArgument(size >= 0, "negative size");
106
107
108
109 if (size > maxAlloc) {
110 return null;
111 }
112
113 while (true) {
114 Chunk c = getOrMakeChunk();
115
116
117 int allocOffset = c.alloc(size);
118 if (allocOffset != -1) {
119
120
121 return new Allocation(c.data, allocOffset);
122 }
123
124
125
126 tryRetireChunk(c);
127 }
128 }
129
130
131
132
133
134 void close() {
135 this.closed = true;
136
137
138 if (chunkPool != null && openScannerCount.get() == 0
139 && reclaimed.compareAndSet(false, true)) {
140 chunkPool.putbackChunks(this.chunkQueue);
141 }
142 }
143
144
145
146
147 void incScannerCount() {
148 this.openScannerCount.incrementAndGet();
149 }
150
151
152
153
154 void decScannerCount() {
155 int count = this.openScannerCount.decrementAndGet();
156 if (chunkPool != null && count == 0 && this.closed
157 && reclaimed.compareAndSet(false, true)) {
158 chunkPool.putbackChunks(this.chunkQueue);
159 }
160 }
161
162
163
164
165
166
167 private void tryRetireChunk(Chunk c) {
168 curChunk.compareAndSet(c, null);
169
170
171
172
173
174
175 }
176
177
178
179
180
181 private Chunk getOrMakeChunk() {
182 while (true) {
183
184 Chunk c = curChunk.get();
185 if (c != null) {
186 return c;
187 }
188
189
190
191
192 c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
193 if (curChunk.compareAndSet(null, c)) {
194
195
196 c.init();
197 this.chunkQueue.add(c);
198 return c;
199 } else if (chunkPool != null) {
200 chunkPool.putbackChunk(c);
201 }
202
203
204 }
205 }
206
207
208
209
210 static class Chunk {
211
212 private byte[] data;
213
214 private static final int UNINITIALIZED = -1;
215 private static final int OOM = -2;
216
217
218
219
220 private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
221
222
223 private AtomicInteger allocCount = new AtomicInteger();
224
225
226 private final int size;
227
228
229
230
231
232
233 Chunk(int size) {
234 this.size = size;
235 }
236
237
238
239
240
241
242 public void init() {
243 assert nextFreeOffset.get() == UNINITIALIZED;
244 try {
245 if (data == null) {
246 data = new byte[size];
247 }
248 } catch (OutOfMemoryError e) {
249 boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
250 assert failInit;
251 throw e;
252 }
253
254 boolean initted = nextFreeOffset.compareAndSet(
255 UNINITIALIZED, 0);
256
257
258 Preconditions.checkState(initted,
259 "Multiple threads tried to init same chunk");
260 }
261
262
263
264
265 void reset() {
266 if (nextFreeOffset.get() != UNINITIALIZED) {
267 nextFreeOffset.set(UNINITIALIZED);
268 allocCount.set(0);
269 }
270 }
271
272
273
274
275
276 public int alloc(int size) {
277 while (true) {
278 int oldOffset = nextFreeOffset.get();
279 if (oldOffset == UNINITIALIZED) {
280
281
282
283
284 Thread.yield();
285 continue;
286 }
287 if (oldOffset == OOM) {
288
289 return -1;
290 }
291
292 if (oldOffset + size > data.length) {
293 return -1;
294 }
295
296
297 if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
298
299 allocCount.incrementAndGet();
300 return oldOffset;
301 }
302
303 }
304 }
305
306 @Override
307 public String toString() {
308 return "Chunk@" + System.identityHashCode(this) +
309 " allocs=" + allocCount.get() + "waste=" +
310 (data.length - nextFreeOffset.get());
311 }
312 }
313
314
315
316
317
318
319 public static class Allocation {
320 private final byte[] data;
321 private final int offset;
322
323 private Allocation(byte[] data, int off) {
324 this.data = data;
325 this.offset = off;
326 }
327
328 @Override
329 public String toString() {
330 return "Allocation(" + "capacity=" + data.length + ", off=" + offset
331 + ")";
332 }
333
334 byte[] getData() {
335 return data;
336 }
337
338 int getOffset() {
339 return offset;
340 }
341 }
342 }