View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.util.Arrays;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import com.google.common.base.Objects;
30  import com.google.common.base.Preconditions;
31  import com.google.common.primitives.Ints;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
36  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
37  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
38  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
39  
40  /**
41   * This class is used to allocate a block with specified size and free the block
42   * when evicting. It manages an array of buckets, each bucket is associated with
43   * a size and caches elements up to this size. For completely empty bucket, this
44   * size could be re-specified dynamically.
45   * 
46   * This class is not thread safe.
47   */
48  @InterfaceAudience.Private
49  @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
50  public final class BucketAllocator {
51    static final Log LOG = LogFactory.getLog(BucketAllocator.class);
52  
53    @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
54    public final static class Bucket {
55      private long baseOffset;
56      private int itemAllocationSize, sizeIndex;
57      private int itemCount;
58      private int freeList[];
59      private int freeCount, usedCount;
60  
61      public Bucket(long offset) {
62        baseOffset = offset;
63        sizeIndex = -1;
64      }
65  
66      void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
67        Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
68        this.sizeIndex = sizeIndex;
69        itemAllocationSize = bucketSizes[sizeIndex];
70        itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
71        freeCount = itemCount;
72        usedCount = 0;
73        freeList = new int[itemCount];
74        for (int i = 0; i < freeCount; ++i)
75          freeList[i] = i;
76      }
77  
78      public boolean isUninstantiated() {
79        return sizeIndex == -1;
80      }
81  
82      public int sizeIndex() {
83        return sizeIndex;
84      }
85  
86      public int getItemAllocationSize() {
87        return itemAllocationSize;
88      }
89  
90      public boolean hasFreeSpace() {
91        return freeCount > 0;
92      }
93  
94      public boolean isCompletelyFree() {
95        return usedCount == 0;
96      }
97  
98      public int freeCount() {
99        return freeCount;
100     }
101 
102     public int usedCount() {
103       return usedCount;
104     }
105 
106     public int getFreeBytes() {
107       return freeCount * itemAllocationSize;
108     }
109 
110     public int getUsedBytes() {
111       return usedCount * itemAllocationSize;
112     }
113 
114     public long getBaseOffset() {
115       return baseOffset;
116     }
117 
118     /**
119      * Allocate a block in this bucket, return the offset representing the
120      * position in physical space
121      * @return the offset in the IOEngine
122      */
123     public long allocate() {
124       assert freeCount > 0; // Else should not have been called
125       assert sizeIndex != -1;
126       ++usedCount;
127       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
128       assert offset >= 0;
129       return offset;
130     }
131 
132     public void addAllocation(long offset) throws BucketAllocatorException {
133       offset -= baseOffset;
134       if (offset < 0 || offset % itemAllocationSize != 0)
135         throw new BucketAllocatorException(
136             "Attempt to add allocation for bad offset: " + offset + " base="
137                 + baseOffset + ", bucket size=" + itemAllocationSize);
138       int idx = (int) (offset / itemAllocationSize);
139       boolean matchFound = false;
140       for (int i = 0; i < freeCount; ++i) {
141         if (matchFound) freeList[i - 1] = freeList[i];
142         else if (freeList[i] == idx) matchFound = true;
143       }
144       if (!matchFound)
145         throw new BucketAllocatorException("Couldn't find match for index "
146             + idx + " in free list");
147       ++usedCount;
148       --freeCount;
149     }
150 
151     private void free(long offset) {
152       offset -= baseOffset;
153       assert offset >= 0;
154       assert offset < itemCount * itemAllocationSize;
155       assert offset % itemAllocationSize == 0;
156       assert usedCount > 0;
157       assert freeCount < itemCount; // Else duplicate free
158       int item = (int) (offset / (long) itemAllocationSize);
159       assert !freeListContains(item);
160       --usedCount;
161       freeList[freeCount++] = item;
162     }
163 
164     private boolean freeListContains(int blockNo) {
165       for (int i = 0; i < freeCount; ++i) {
166         if (freeList[i] == blockNo) return true;
167       }
168       return false;
169     }
170   }
171 
172   final class BucketSizeInfo {
173     // Free bucket means it has space to allocate a block;
174     // Completely free bucket means it has no block.
175     private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
176     private int sizeIndex;
177 
178     BucketSizeInfo(int sizeIndex) {
179       bucketList = new LinkedList<Bucket>();
180       freeBuckets = new LinkedList<Bucket>();
181       completelyFreeBuckets = new LinkedList<Bucket>();
182       this.sizeIndex = sizeIndex;
183     }
184 
185     public synchronized void instantiateBucket(Bucket b) {
186       assert b.isUninstantiated() || b.isCompletelyFree();
187       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
188       bucketList.add(b);
189       freeBuckets.add(b);
190       completelyFreeBuckets.add(b);
191     }
192 
193     public int sizeIndex() {
194       return sizeIndex;
195     }
196 
197     /**
198      * Find a bucket to allocate a block
199      * @return the offset in the IOEngine
200      */
201     public long allocateBlock() {
202       Bucket b = null;
203       if (freeBuckets.size() > 0) // Use up an existing one first...
204         b = freeBuckets.get(freeBuckets.size() - 1);
205       if (b == null) {
206         b = grabGlobalCompletelyFreeBucket();
207         if (b != null) instantiateBucket(b);
208       }
209       if (b == null) return -1;
210       long result = b.allocate();
211       blockAllocated(b);
212       return result;
213     }
214 
215     void blockAllocated(Bucket b) {
216       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
217       if (!b.hasFreeSpace()) freeBuckets.remove(b);
218     }
219 
220     public Bucket findAndRemoveCompletelyFreeBucket() {
221       Bucket b = null;
222       assert bucketList.size() > 0;
223       if (bucketList.size() == 1) {
224         // So we never get complete starvation of a bucket for a size
225         return null;
226       }
227 
228       if (completelyFreeBuckets.size() > 0) {
229         b = completelyFreeBuckets.get(0);
230         removeBucket(b);
231       }
232       return b;
233     }
234 
235     private synchronized void removeBucket(Bucket b) {
236       assert b.isCompletelyFree();
237       bucketList.remove(b);
238       freeBuckets.remove(b);
239       completelyFreeBuckets.remove(b);
240     }
241 
242     public void freeBlock(Bucket b, long offset) {
243       assert bucketList.contains(b);
244       // else we shouldn't have anything to free...
245       assert (!completelyFreeBuckets.contains(b));
246       b.free(offset);
247       if (!freeBuckets.contains(b)) freeBuckets.add(b);
248       if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
249     }
250 
251     public synchronized IndexStatistics statistics() {
252       long free = 0, used = 0;
253       for (Bucket b : bucketList) {
254         free += b.freeCount();
255         used += b.usedCount();
256       }
257       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
258     }
259 
260     @Override
261     public String toString() {
262       return Objects.toStringHelper(this.getClass())
263         .add("sizeIndex", sizeIndex)
264         .add("bucketSize", bucketSizes[sizeIndex])
265         .toString();
266     }
267   }
268 
269   // Default block size is 64K, so we choose more sizes near 64K, you'd better
270   // reset it according to your cluster's block size distribution
271   // TODO Support the view of block size distribution statistics
272   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
273       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
274       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
275       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
276       512 * 1024 + 1024 };
277 
278   /**
279    * Round up the given block size to bucket size, and get the corresponding
280    * BucketSizeInfo
281    */
282   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
283     for (int i = 0; i < bucketSizes.length; ++i)
284       if (blockSize <= bucketSizes[i])
285         return bucketSizeInfos[i];
286     return null;
287   }
288 
289   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
290 
291   private final int[] bucketSizes;
292   private final int bigItemSize;
293   // The capacity size for each bucket
294   private final long bucketCapacity;
295   private Bucket[] buckets;
296   private BucketSizeInfo[] bucketSizeInfos;
297   private final long totalSize;
298   private long usedSize = 0;
299 
300   BucketAllocator(long availableSpace, int[] bucketSizes)
301       throws BucketAllocatorException {
302     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
303     Arrays.sort(this.bucketSizes);
304     this.bigItemSize = Ints.max(this.bucketSizes);
305     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
306     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
307     if (buckets.length < this.bucketSizes.length)
308       throw new BucketAllocatorException(
309           "Bucket allocator size too small - must have room for at least "
310               + this.bucketSizes.length + " buckets");
311     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
312     for (int i = 0; i < this.bucketSizes.length; ++i) {
313       bucketSizeInfos[i] = new BucketSizeInfo(i);
314     }
315     for (int i = 0; i < buckets.length; ++i) {
316       buckets[i] = new Bucket(bucketCapacity * i);
317       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
318           .instantiateBucket(buckets[i]);
319     }
320     this.totalSize = ((long) buckets.length) * bucketCapacity;
321   }
322 
323   /**
324    * Rebuild the allocator's data structures from a persisted map.
325    * @param availableSpace capacity of cache
326    * @param map A map stores the block key and BucketEntry(block's meta data
327    *          like offset, length)
328    * @param realCacheSize cached data size statistics for bucket cache
329    * @throws BucketAllocatorException
330    */
331   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
332       AtomicLong realCacheSize) throws BucketAllocatorException {
333     this(availableSpace, bucketSizes);
334 
335     // each bucket has an offset, sizeindex. probably the buckets are too big
336     // in our default state. so what we do is reconfigure them according to what
337     // we've found. we can only reconfigure each bucket once; if more than once,
338     // we know there's a bug, so we just log the info, throw, and start again...
339     boolean[] reconfigured = new boolean[buckets.length];
340     for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
341       long foundOffset = entry.getValue().offset();
342       int foundLen = entry.getValue().getLength();
343       int bucketSizeIndex = -1;
344       for (int i = 0; i < bucketSizes.length; ++i) {
345         if (foundLen <= bucketSizes[i]) {
346           bucketSizeIndex = i;
347           break;
348         }
349       }
350       if (bucketSizeIndex == -1) {
351         throw new BucketAllocatorException(
352             "Can't match bucket size for the block with size " + foundLen);
353       }
354       int bucketNo = (int) (foundOffset / bucketCapacity);
355       if (bucketNo < 0 || bucketNo >= buckets.length)
356         throw new BucketAllocatorException("Can't find bucket " + bucketNo
357             + ", total buckets=" + buckets.length
358             + "; did you shrink the cache?");
359       Bucket b = buckets[bucketNo];
360       if (reconfigured[bucketNo]) {
361         if (b.sizeIndex() != bucketSizeIndex)
362           throw new BucketAllocatorException(
363               "Inconsistent allocation in bucket map;");
364       } else {
365         if (!b.isCompletelyFree())
366           throw new BucketAllocatorException("Reconfiguring bucket "
367               + bucketNo + " but it's already allocated; corrupt data");
368         // Need to remove the bucket from whichever list it's currently in at
369         // the moment...
370         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
371         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
372         oldbsi.removeBucket(b);
373         bsi.instantiateBucket(b);
374         reconfigured[bucketNo] = true;
375       }
376       realCacheSize.addAndGet(foundLen);
377       buckets[bucketNo].addAllocation(foundOffset);
378       usedSize += buckets[bucketNo].getItemAllocationSize();
379       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
380     }
381   }
382 
383   public String toString() {
384     StringBuilder sb = new StringBuilder(1024);
385     for (int i = 0; i < buckets.length; ++i) {
386       Bucket b = buckets[i];
387       if (i > 0) sb.append(", ");
388       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
389       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
390     }
391     return sb.toString();
392   }
393 
394   public long getUsedSize() {
395     return this.usedSize;
396   }
397 
398   public long getFreeSize() {
399     return this.totalSize - getUsedSize();
400   }
401 
402   public long getTotalSize() {
403     return this.totalSize;
404   }
405 
406   /**
407    * Allocate a block with specified size. Return the offset
408    * @param blockSize size of block
409    * @throws BucketAllocatorException,CacheFullException
410    * @return the offset in the IOEngine
411    */
412   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
413       BucketAllocatorException {
414     assert blockSize > 0;
415     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
416     if (bsi == null) {
417       throw new BucketAllocatorException("Allocation too big size=" + blockSize +
418         "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
419         " to accomodate if size seems reasonable and you want it cached.");
420     }
421     long offset = bsi.allocateBlock();
422 
423     // Ask caller to free up space and try again!
424     if (offset < 0)
425       throw new CacheFullException(blockSize, bsi.sizeIndex());
426     usedSize += bucketSizes[bsi.sizeIndex()];
427     return offset;
428   }
429 
430   private Bucket grabGlobalCompletelyFreeBucket() {
431     for (BucketSizeInfo bsi : bucketSizeInfos) {
432       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
433       if (b != null) return b;
434     }
435     return null;
436   }
437 
438   /**
439    * Free a block with the offset
440    * @param offset block's offset
441    * @return size freed
442    */
443   public synchronized int freeBlock(long offset) {
444     int bucketNo = (int) (offset / bucketCapacity);
445     assert bucketNo >= 0 && bucketNo < buckets.length;
446     Bucket targetBucket = buckets[bucketNo];
447     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
448     usedSize -= targetBucket.getItemAllocationSize();
449     return targetBucket.getItemAllocationSize();
450   }
451 
452   public int sizeIndexOfAllocation(long offset) {
453     int bucketNo = (int) (offset / bucketCapacity);
454     assert bucketNo >= 0 && bucketNo < buckets.length;
455     Bucket targetBucket = buckets[bucketNo];
456     return targetBucket.sizeIndex();
457   }
458 
459   public int sizeOfAllocation(long offset) {
460     int bucketNo = (int) (offset / bucketCapacity);
461     assert bucketNo >= 0 && bucketNo < buckets.length;
462     Bucket targetBucket = buckets[bucketNo];
463     return targetBucket.getItemAllocationSize();
464   }
465 
466   static class IndexStatistics {
467     private long freeCount, usedCount, itemSize, totalCount;
468 
469     public long freeCount() {
470       return freeCount;
471     }
472 
473     public long usedCount() {
474       return usedCount;
475     }
476 
477     public long totalCount() {
478       return totalCount;
479     }
480 
481     public long freeBytes() {
482       return freeCount * itemSize;
483     }
484 
485     public long usedBytes() {
486       return usedCount * itemSize;
487     }
488 
489     public long totalBytes() {
490       return totalCount * itemSize;
491     }
492 
493     public long itemSize() {
494       return itemSize;
495     }
496 
497     public IndexStatistics(long free, long used, long itemSize) {
498       setTo(free, used, itemSize);
499     }
500 
501     public IndexStatistics() {
502       setTo(-1, -1, 0);
503     }
504 
505     public void setTo(long free, long used, long itemSize) {
506       this.itemSize = itemSize;
507       this.freeCount = free;
508       this.usedCount = used;
509       this.totalCount = free + used;
510     }
511   }
512 
513   public Bucket [] getBuckets() {
514     return this.buckets;
515   }
516 
517   public void dumpToLog() {
518     logStatistics();
519     StringBuilder sb = new StringBuilder();
520     for (Bucket b : buckets) {
521       sb.append("Bucket:").append(b.baseOffset).append('\n');
522       sb.append("  Size index: " + b.sizeIndex() + "; Free:" + b.freeCount
523           + "; used:" + b.usedCount + "; freelist\n");
524       for (int i = 0; i < b.freeCount(); ++i)
525         sb.append(b.freeList[i]).append(',');
526       sb.append('\n');
527     }
528     LOG.info(sb);
529   }
530 
531   public void logStatistics() {
532     IndexStatistics total = new IndexStatistics();
533     IndexStatistics[] stats = getIndexStatistics(total);
534     LOG.info("Bucket allocator statistics follow:\n");
535     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
536         + total.usedBytes() + "; total bytes=" + total.totalBytes());
537     for (IndexStatistics s : stats) {
538       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
539           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
540     }
541   }
542 
543   public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
544     IndexStatistics[] stats = getIndexStatistics();
545     long totalfree = 0, totalused = 0;
546     for (IndexStatistics stat : stats) {
547       totalfree += stat.freeBytes();
548       totalused += stat.usedBytes();
549     }
550     grandTotal.setTo(totalfree, totalused, 1);
551     return stats;
552   }
553 
554   public IndexStatistics[] getIndexStatistics() {
555     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
556     for (int i = 0; i < stats.length; ++i)
557       stats[i] = bucketSizeInfos[i].statistics();
558     return stats;
559   }
560 
561   public long freeBlock(long freeList[]) {
562     long sz = 0;
563     for (int i = 0; i < freeList.length; ++i)
564       sz += freeBlock(freeList[i]);
565     return sz;
566   }
567 
568 }