1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile.slab;
21
22 import java.math.BigDecimal;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.Map.Entry;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.io.HeapSize;
38 import org.apache.hadoop.hbase.io.hfile.BlockCache;
39 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
40 import org.apache.hadoop.hbase.io.hfile.BlockPriority;
41 import org.apache.hadoop.hbase.io.hfile.BlockType;
42 import org.apache.hadoop.hbase.io.hfile.CacheStats;
43 import org.apache.hadoop.hbase.io.hfile.Cacheable;
44 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
45 import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
46 import org.apache.hadoop.hbase.util.ClassSize;
47 import org.apache.hadoop.hbase.util.HasThread;
48 import org.apache.hadoop.util.StringUtils;
49
50 import com.google.common.base.Preconditions;
51 import com.google.common.util.concurrent.ThreadFactoryBuilder;
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Private
61 @Deprecated
62 public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize {
63
64 private final ConcurrentHashMap<BlockCacheKey, SingleSizeCache> backingStore;
65 private final TreeMap<Integer, SingleSizeCache> slabs;
66 static final Log LOG = LogFactory.getLog(SlabCache.class);
67 static final int STAT_THREAD_PERIOD_SECS = 60 * 5;
68
69 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
70 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Slab Statistics #%d").build());
71
72 long size;
73 private final CacheStats stats;
74 final SlabStats requestStats;
75 final SlabStats successfullyCachedStats;
76 private final long avgBlockSize;
77 private static final long CACHE_FIXED_OVERHEAD = ClassSize.estimateBase(
78 SlabCache.class, false);
79
80
81
82
83
84
85
86
87 public SlabCache(long size, long avgBlockSize) {
88 this.avgBlockSize = avgBlockSize;
89 this.size = size;
90 this.stats = new CacheStats();
91 this.requestStats = new SlabStats();
92 this.successfullyCachedStats = new SlabStats();
93
94 backingStore = new ConcurrentHashMap<BlockCacheKey, SingleSizeCache>();
95 slabs = new TreeMap<Integer, SingleSizeCache>();
96 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
97 STAT_THREAD_PERIOD_SECS, STAT_THREAD_PERIOD_SECS, TimeUnit.SECONDS);
98 }
99
100 public Map<Integer, SingleSizeCache> getSizer() {
101 return slabs;
102 }
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 public void addSlabByConf(Configuration conf) {
119
120 String[] porportions = conf.getStrings(
121 "hbase.offheapcache.slab.proportions", "0.80", "0.20");
122 String[] sizes = conf.getStrings("hbase.offheapcache.slab.sizes",
123 Long.valueOf(avgBlockSize * 11 / 10).toString(),
124 Long.valueOf(avgBlockSize * 21 / 10).toString());
125
126 if (porportions.length != sizes.length) {
127 throw new IllegalArgumentException(
128 "SlabCache conf not "
129 + "initialized, error in configuration. hbase.offheap.slab.proportions specifies "
130 + porportions.length
131 + " slabs while hbase.offheap.slab.sizes specifies "
132 + sizes.length + " slabs "
133 + "offheapslabporportions and offheapslabsizes");
134 }
135
136
137
138
139 BigDecimal[] parsedProportions = stringArrayToBigDecimalArray(porportions);
140 BigDecimal[] parsedSizes = stringArrayToBigDecimalArray(sizes);
141
142 BigDecimal sumProportions = new BigDecimal(0);
143 for (BigDecimal b : parsedProportions) {
144
145 Preconditions
146 .checkArgument(b.compareTo(BigDecimal.ZERO) == 1,
147 "Proportions in hbase.offheap.slab.proportions must be greater than 0!");
148 sumProportions = sumProportions.add(b);
149 }
150
151
152 Preconditions
153 .checkArgument(sumProportions.compareTo(BigDecimal.ONE) != 1,
154 "Sum of all proportions in hbase.offheap.slab.proportions must be less than 1");
155
156
157 if (sumProportions.compareTo(new BigDecimal("0.99")) == -1) {
158 LOG.warn("Sum of hbase.offheap.slab.proportions is less than 0.99! Memory is being wasted");
159 }
160 for (int i = 0; i < parsedProportions.length; i++) {
161 int blockSize = parsedSizes[i].intValue();
162 int numBlocks = new BigDecimal(this.size).multiply(parsedProportions[i])
163 .divide(parsedSizes[i], BigDecimal.ROUND_DOWN).intValue();
164 addSlab(blockSize, numBlocks);
165 }
166 }
167
168
169
170
171
172
173
174
175
176
177 Entry<Integer, SingleSizeCache> getHigherBlock(int size) {
178 return slabs.higherEntry(size - 1);
179 }
180
181 private BigDecimal[] stringArrayToBigDecimalArray(String[] parsee) {
182 BigDecimal[] parsed = new BigDecimal[parsee.length];
183 for (int i = 0; i < parsee.length; i++) {
184 parsed[i] = new BigDecimal(parsee[i].trim());
185 }
186 return parsed;
187 }
188
189 private void addSlab(int blockSize, int numBlocks) {
190 LOG.info("Creating a slab of blockSize " + blockSize + " with " + numBlocks
191 + " blocks, " + StringUtils.humanReadableInt(blockSize * (long) numBlocks) + "bytes.");
192 slabs.put(blockSize, new SingleSizeCache(blockSize, numBlocks, this));
193 }
194
195
196
197
198
199
200
201
202
203
204
205
206
207 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem) {
208 Entry<Integer, SingleSizeCache> scacheEntry = getHigherBlock(cachedItem
209 .getSerializedLength());
210
211 this.requestStats.addin(cachedItem.getSerializedLength());
212
213 if (scacheEntry == null) {
214 return;
215 }
216
217 this.successfullyCachedStats.addin(cachedItem.getSerializedLength());
218 SingleSizeCache scache = scacheEntry.getValue();
219
220
221
222
223
224 scache.cacheBlock(cacheKey, cachedItem);
225 }
226
227
228
229
230
231 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
232 cacheBlock(cacheKey, buf);
233 }
234
235 public CacheStats getStats() {
236 return this.stats;
237 }
238
239
240
241
242
243
244 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
245 boolean updateCacheMetrics) {
246 SingleSizeCache cachedBlock = backingStore.get(key);
247 if (cachedBlock == null) {
248 if (!repeat) stats.miss(caching);
249 return null;
250 }
251
252 Cacheable contentBlock = cachedBlock.getBlock(key, caching, false, updateCacheMetrics);
253
254 if (contentBlock != null) {
255 if (updateCacheMetrics) stats.hit(caching);
256 } else if (!repeat) {
257 if (updateCacheMetrics) stats.miss(caching);
258 }
259 return contentBlock;
260 }
261
262
263
264
265
266 public boolean evictBlock(BlockCacheKey cacheKey) {
267 SingleSizeCache cacheEntry = backingStore.get(cacheKey);
268 if (cacheEntry == null) {
269 return false;
270 } else {
271 cacheEntry.evictBlock(cacheKey);
272 return true;
273 }
274 }
275
276 @Override
277 public void onEviction(BlockCacheKey key, SingleSizeCache notifier) {
278 stats.evicted();
279 backingStore.remove(key);
280 }
281
282 @Override
283 public void onInsertion(BlockCacheKey key, SingleSizeCache notifier) {
284 backingStore.put(key, notifier);
285 }
286
287
288
289
290
291
292 public void shutdown() {
293 for (SingleSizeCache s : slabs.values()) {
294 s.shutdown();
295 }
296 this.scheduleThreadPool.shutdown();
297 }
298
299 public long heapSize() {
300 long childCacheSize = 0;
301 for (SingleSizeCache s : slabs.values()) {
302 childCacheSize += s.heapSize();
303 }
304 return SlabCache.CACHE_FIXED_OVERHEAD + childCacheSize;
305 }
306
307 public long size() {
308 return this.size;
309 }
310
311 public long getFreeSize() {
312 long childFreeSize = 0;
313 for (SingleSizeCache s : slabs.values()) {
314 childFreeSize += s.getFreeSize();
315 }
316 return childFreeSize;
317 }
318
319 @Override
320 public long getBlockCount() {
321 long count = 0;
322 for (SingleSizeCache cache : slabs.values()) {
323 count += cache.getBlockCount();
324 }
325 return count;
326 }
327
328 public long getCurrentSize() {
329 return size;
330 }
331
332 public long getEvictedCount() {
333 return stats.getEvictedCount();
334 }
335
336
337
338
339 static class StatisticsThread extends HasThread {
340 SlabCache ourcache;
341
342 public StatisticsThread(SlabCache slabCache) {
343 super("SlabCache.StatisticsThread");
344 setDaemon(true);
345 this.ourcache = slabCache;
346 }
347
348 @Override
349 public void run() {
350 for (SingleSizeCache s : ourcache.slabs.values()) {
351 s.logStats();
352 }
353
354 SlabCache.LOG.info("Current heap size is: "
355 + StringUtils.humanReadableInt(ourcache.heapSize()));
356
357 LOG.info("Request Stats");
358 ourcache.requestStats.logStats();
359 LOG.info("Successfully Cached Stats");
360 ourcache.successfullyCachedStats.logStats();
361 }
362
363 }
364
365
366
367
368
369
370 static class SlabStats {
371
372
373
374 static final int MULTIPLIER = 10;
375 final int NUMDIVISIONS = (int) (Math.log(Integer.MAX_VALUE) * MULTIPLIER);
376 private final AtomicLong[] counts = new AtomicLong[NUMDIVISIONS];
377
378 public SlabStats() {
379 for (int i = 0; i < NUMDIVISIONS; i++) {
380 counts[i] = new AtomicLong();
381 }
382 }
383
384 public void addin(int size) {
385 int index = (int) (Math.log(size) * MULTIPLIER);
386 counts[index].incrementAndGet();
387 }
388
389 public AtomicLong[] getUsage() {
390 return counts;
391 }
392
393 double getUpperBound(int index) {
394 return Math.pow(Math.E, ((index + 0.5) / MULTIPLIER));
395 }
396
397 double getLowerBound(int index) {
398 return Math.pow(Math.E, ((index - 0.5) / MULTIPLIER));
399 }
400
401 public void logStats() {
402 AtomicLong[] fineGrainedStats = getUsage();
403 for (int i = 0; i < fineGrainedStats.length; i++) {
404
405 if (fineGrainedStats[i].get() > 0) {
406 SlabCache.LOG.info("From "
407 + StringUtils.humanReadableInt((long) getLowerBound(i)) + "- "
408 + StringUtils.humanReadableInt((long) getUpperBound(i)) + ": "
409 + StringUtils.humanReadableInt(fineGrainedStats[i].get())
410 + " requests");
411
412 }
413 }
414 }
415 }
416
417 public int evictBlocksByHfileName(String hfileName) {
418 int numEvicted = 0;
419 for (BlockCacheKey key : backingStore.keySet()) {
420 if (key.getHfileName().equals(hfileName)) {
421 if (evictBlock(key))
422 ++numEvicted;
423 }
424 }
425 return numEvicted;
426 }
427
428 @Override
429 public Iterator<CachedBlock> iterator() {
430
431 final Iterator<Map.Entry<BlockCacheKey, SingleSizeCache>> i =
432 this.backingStore.entrySet().iterator();
433 return new Iterator<CachedBlock>() {
434 private final long now = System.nanoTime();
435
436 @Override
437 public boolean hasNext() {
438 return i.hasNext();
439 }
440
441 @Override
442 public CachedBlock next() {
443 final Map.Entry<BlockCacheKey, SingleSizeCache> e = i.next();
444 final Cacheable cacheable = e.getValue().getBlock(e.getKey(), false, false, false);
445 return new CachedBlock() {
446 @Override
447 public String toString() {
448 return BlockCacheUtil.toString(this, now);
449 }
450
451 @Override
452 public BlockPriority getBlockPriority() {
453 return null;
454 }
455
456 @Override
457 public BlockType getBlockType() {
458 return cacheable.getBlockType();
459 }
460
461 @Override
462 public long getOffset() {
463 return e.getKey().getOffset();
464 }
465
466 @Override
467 public long getSize() {
468 return cacheable == null? 0: cacheable.getSerializedLength();
469 }
470
471 @Override
472 public long getCachedTime() {
473 return -1;
474 }
475
476 @Override
477 public String getFilename() {
478 return e.getKey().getHfileName();
479 }
480
481 @Override
482 public int compareTo(CachedBlock other) {
483 return (int)(this.getOffset() - other.getOffset());
484 }
485 };
486 }
487
488 @Override
489 public void remove() {
490 throw new UnsupportedOperationException();
491 }
492 };
493 }
494
495 @Override
496 public BlockCache[] getBlockCaches() {
497 return null;
498 }
499 }