View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collections;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FSDataOutputStream;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValue.KVComparator;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter;
48  import org.apache.hadoop.io.WritableUtils;
49  import org.apache.hadoop.util.StringUtils;
50  
51  /**
52   * Provides functionality to write ({@link BlockIndexWriter}) and read
53   * ({@link BlockIndexReader}) single-level and multi-level block indexes.
54   *
55   * Examples of how to use the block index writer can be found in
56   * {@link CompoundBloomFilterWriter} and {@link HFileWriterV2}. Examples of how
57   * to use the reader can be found in {@link HFileReaderV2} and
58   * TestHFileBlockIndex.
59   */
60  @InterfaceAudience.Private
61  public class HFileBlockIndex {
62  
63    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
64  
65    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
66  
67    /**
68     * The maximum size guideline for index blocks (both leaf, intermediate, and
69     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
70     */
71    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
72  
73    /**
74     * The number of bytes stored in each "secondary index" entry in addition to
75     * key bytes in the non-root index block format. The first long is the file
76     * offset of the deeper-level block the entry points to, and the int that
77     * follows is that block's on-disk size without including header.
78     */
79    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
80        + Bytes.SIZEOF_LONG;
81  
82    /**
83     * Error message when trying to use inline block API in single-level mode.
84     */
85    private static final String INLINE_BLOCKS_NOT_ALLOWED =
86        "Inline blocks are not allowed in the single-level-only mode";
87  
88    /**
89     * The size of a meta-data record used for finding the mid-key in a
90     * multi-level index. Consists of the middle leaf-level index block offset
91     * (long), its on-disk size without header included (int), and the mid-key
92     * entry's zero-based index in that leaf index block.
93     */
94    private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
95        2 * Bytes.SIZEOF_INT;
96  
97    /**
98     * The reader will always hold the root level index in the memory. Index
99     * blocks at all other levels will be cached in the LRU cache in practice,
100    * although this API does not enforce that.
101    *
102    * All non-root (leaf and intermediate) index blocks contain what we call a
103    * "secondary index": an array of offsets to the entries within the block.
104    * This allows us to do binary search for the entry corresponding to the
105    * given key without having to deserialize the block.
106    */
107   public static class BlockIndexReader implements HeapSize {
108     /** Needed doing lookup on blocks. */
109     private final KVComparator comparator;
110 
111     // Root-level data.
112     private byte[][] blockKeys;
113     private long[] blockOffsets;
114     private int[] blockDataSizes;
115     private int rootCount = 0;
116 
117     // Mid-key metadata.
118     private long midLeafBlockOffset = -1;
119     private int midLeafBlockOnDiskSize = -1;
120     private int midKeyEntry = -1;
121 
122     /** Pre-computed mid-key */
123     private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
124 
125     /**
126      * The number of levels in the block index tree. One if there is only root
127      * level, two for root and leaf levels, etc.
128      */
129     private int searchTreeLevel;
130 
131     /** A way to read {@link HFile} blocks at a given offset */
132     private CachingBlockReader cachingBlockReader;
133 
134     public BlockIndexReader(final KVComparator c, final int treeLevel,
135         final CachingBlockReader cachingBlockReader) {
136       this(c, treeLevel);
137       this.cachingBlockReader = cachingBlockReader;
138     }
139 
140     public BlockIndexReader(final KVComparator c, final int treeLevel)
141     {
142       comparator = c;
143       searchTreeLevel = treeLevel;
144     }
145 
146     /**
147      * @return true if the block index is empty.
148      */
149     public boolean isEmpty() {
150       return blockKeys.length == 0;
151     }
152 
153     /**
154      * Verifies that the block index is non-empty and throws an
155      * {@link IllegalStateException} otherwise.
156      */
157     public void ensureNonEmpty() {
158       if (blockKeys.length == 0) {
159         throw new IllegalStateException("Block index is empty or not loaded");
160       }
161     }
162 
163     /**
164      * Return the data block which contains this key. This function will only
165      * be called when the HFile version is larger than 1.
166      *
167      * @param key the key we are looking for
168      * @param keyOffset the offset of the key in its byte array
169      * @param keyLength the length of the key
170      * @param currentBlock the current block, to avoid re-reading the same
171      *          block
172      * @return reader a basic way to load blocks
173      * @throws IOException
174      */
175     public HFileBlock seekToDataBlock(final byte[] key, int keyOffset,
176         int keyLength, HFileBlock currentBlock, boolean cacheBlocks,
177         boolean pread, boolean isCompaction)
178         throws IOException {
179       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, keyOffset, keyLength,
180           currentBlock, cacheBlocks, pread, isCompaction);
181       if (blockWithScanInfo == null) {
182         return null;
183       } else {
184         return blockWithScanInfo.getHFileBlock();
185       }
186     }
187 
188     /**
189      * Return the BlockWithScanInfo which contains the DataBlock with other scan info
190      * such as nextIndexedKey.
191      * This function will only be called when the HFile version is larger than 1.
192      *
193      * @param key the key we are looking for
194      * @param keyOffset the offset of the key in its byte array
195      * @param keyLength the length of the key
196      * @param currentBlock the current block, to avoid re-reading the same
197      *          block
198      * @param cacheBlocks
199      * @param pread
200      * @param isCompaction
201      * @return the BlockWithScanInfo which contains the DataBlock with other scan info
202      *         such as nextIndexedKey.
203      * @throws IOException
204      */
205     public BlockWithScanInfo loadDataBlockWithScanInfo(final byte[] key, int keyOffset,
206         int keyLength, HFileBlock currentBlock, boolean cacheBlocks,
207         boolean pread, boolean isCompaction)
208         throws IOException {
209       int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);
210       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
211         return null;
212       }
213 
214       // the next indexed key
215       byte[] nextIndexedKey = null;
216 
217       // Read the next-level (intermediate or leaf) index block.
218       long currentOffset = blockOffsets[rootLevelIndex];
219       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
220 
221       if (rootLevelIndex < blockKeys.length - 1) {
222         nextIndexedKey = blockKeys[rootLevelIndex + 1];
223       } else {
224         nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
225       }
226 
227       int lookupLevel = 1; // How many levels deep we are in our lookup.
228       int index = -1;
229 
230       HFileBlock block;
231       while (true) {
232 
233         if (currentBlock != null && currentBlock.getOffset() == currentOffset)
234         {
235           // Avoid reading the same block again, even with caching turned off.
236           // This is crucial for compaction-type workload which might have
237           // caching turned off. This is like a one-block cache inside the
238           // scanner.
239           block = currentBlock;
240         } else {
241           // Call HFile's caching block reader API. We always cache index
242           // blocks, otherwise we might get terrible performance.
243           boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
244           BlockType expectedBlockType;
245           if (lookupLevel < searchTreeLevel - 1) {
246             expectedBlockType = BlockType.INTERMEDIATE_INDEX;
247           } else if (lookupLevel == searchTreeLevel - 1) {
248             expectedBlockType = BlockType.LEAF_INDEX;
249           } else {
250             // this also accounts for ENCODED_DATA
251             expectedBlockType = BlockType.DATA;
252           }
253           block = cachingBlockReader.readBlock(currentOffset,
254               currentOnDiskSize, shouldCache, pread, isCompaction, true,
255               expectedBlockType);
256         }
257 
258         if (block == null) {
259           throw new IOException("Failed to read block at offset " +
260               currentOffset + ", onDiskSize=" + currentOnDiskSize);
261         }
262 
263         // Found a data block, break the loop and check our level in the tree.
264         if (block.getBlockType().isData()) {
265           break;
266         }
267 
268         // Not a data block. This must be a leaf-level or intermediate-level
269         // index block. We don't allow going deeper than searchTreeLevel.
270         if (++lookupLevel > searchTreeLevel) {
271           throw new IOException("Search Tree Level overflow: lookupLevel="+
272               lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
273         }
274 
275         // Locate the entry corresponding to the given key in the non-root
276         // (leaf or intermediate-level) index block.
277         ByteBuffer buffer = block.getBufferWithoutHeader();
278         index = locateNonRootIndexEntry(buffer, key, keyOffset, keyLength, comparator);
279         if (index == -1) {
280           throw new IOException("The key "
281               + Bytes.toStringBinary(key, keyOffset, keyLength)
282               + " is before the" + " first key of the non-root index block "
283               + block);
284         }
285 
286         currentOffset = buffer.getLong();
287         currentOnDiskSize = buffer.getInt();
288 
289         // Only update next indexed key if there is a next indexed key in the current level
290         byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1);
291         if (tmpNextIndexedKey != null) {
292           nextIndexedKey = tmpNextIndexedKey;
293         }
294       }
295 
296       if (lookupLevel != searchTreeLevel) {
297         throw new IOException("Reached a data block at level " + lookupLevel +
298             " but the number of levels is " + searchTreeLevel);
299       }
300 
301       // set the next indexed key for the current block.
302       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
303       return blockWithScanInfo;
304     }
305 
306     /**
307      * An approximation to the {@link HFile}'s mid-key. Operates on block
308      * boundaries, and does not go inside blocks. In other words, returns the
309      * first key of the middle block of the file.
310      *
311      * @return the first key of the middle block
312      */
313     public byte[] midkey() throws IOException {
314       if (rootCount == 0)
315         throw new IOException("HFile empty");
316 
317       byte[] targetMidKey = this.midKey.get();
318       if (targetMidKey != null) {
319         return targetMidKey;
320       }
321 
322       if (midLeafBlockOffset >= 0) {
323         if (cachingBlockReader == null) {
324           throw new IOException("Have to read the middle leaf block but " +
325               "no block reader available");
326         }
327 
328         // Caching, using pread, assuming this is not a compaction.
329         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
330             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
331             BlockType.LEAF_INDEX);
332 
333         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
334         int numDataBlocks = b.getInt();
335         int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
336         int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
337             keyRelOffset;
338         int keyOffset = b.arrayOffset() +
339             Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset +
340             SECONDARY_INDEX_ENTRY_OVERHEAD;
341         targetMidKey = Arrays.copyOfRange(b.array(), keyOffset, keyOffset + keyLen);
342       } else {
343         // The middle of the root-level index.
344         targetMidKey = blockKeys[rootCount / 2];
345       }
346 
347       this.midKey.set(targetMidKey);
348       return targetMidKey;
349     }
350 
351     /**
352      * @param i from 0 to {@link #getRootBlockCount() - 1}
353      */
354     public byte[] getRootBlockKey(int i) {
355       return blockKeys[i];
356     }
357 
358     /**
359      * @param i from 0 to {@link #getRootBlockCount() - 1}
360      */
361     public long getRootBlockOffset(int i) {
362       return blockOffsets[i];
363     }
364 
365     /**
366      * @param i zero-based index of a root-level block
367      * @return the on-disk size of the root-level block for version 2, or the
368      *         uncompressed size for version 1
369      */
370     public int getRootBlockDataSize(int i) {
371       return blockDataSizes[i];
372     }
373 
374     /**
375      * @return the number of root-level blocks in this block index
376      */
377     public int getRootBlockCount() {
378       return rootCount;
379     }
380 
381     /**
382      * Finds the root-level index block containing the given key.
383      *
384      * @param key
385      *          Key to find
386      * @return Offset of block containing <code>key</code> (between 0 and the
387      *         number of blocks - 1) or -1 if this file does not contain the
388      *         request.
389      */
390     public int rootBlockContainingKey(final byte[] key, int offset,
391         int length) {
392       int pos = Bytes.binarySearch(blockKeys, key, offset, length,
393           comparator);
394       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
395       // binarySearch's javadoc.
396 
397       if (pos >= 0) {
398         // This means this is an exact match with an element of blockKeys.
399         assert pos < blockKeys.length;
400         return pos;
401       }
402 
403       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
404       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
405       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
406       // key < blockKeys[0], meaning the file does not contain the given key.
407 
408       int i = -pos - 1;
409       assert 0 <= i && i <= blockKeys.length;
410       return i - 1;
411     }
412 
413     /**
414      * Adds a new entry in the root block index. Only used when reading.
415      *
416      * @param key Last key in the block
417      * @param offset file offset where the block is stored
418      * @param dataSize the uncompressed data size
419      */
420     private void add(final byte[] key, final long offset, final int dataSize) {
421       blockOffsets[rootCount] = offset;
422       blockKeys[rootCount] = key;
423       blockDataSizes[rootCount] = dataSize;
424       rootCount++;
425     }
426 
427     /**
428      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
429      * @param nonRootIndex
430      * @param i the ith position
431      * @return The indexed key at the ith position in the nonRootIndex.
432      */
433     private byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
434       int numEntries = nonRootIndex.getInt(0);
435       if (i < 0 || i >= numEntries) {
436         return null;
437       }
438 
439       // Entries start after the number of entries and the secondary index.
440       // The secondary index takes numEntries + 1 ints.
441       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
442       // Targetkey's offset relative to the end of secondary index
443       int targetKeyRelOffset = nonRootIndex.getInt(
444           Bytes.SIZEOF_INT * (i + 1));
445 
446       // The offset of the target key in the blockIndex buffer
447       int targetKeyOffset = entriesOffset     // Skip secondary index
448           + targetKeyRelOffset               // Skip all entries until mid
449           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
450 
451       // We subtract the two consecutive secondary index elements, which
452       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
453       // then need to subtract the overhead of offset and onDiskSize.
454       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
455         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
456 
457       int from = nonRootIndex.arrayOffset() + targetKeyOffset;
458       int to = from + targetKeyLength;
459       return Arrays.copyOfRange(nonRootIndex.array(), from, to);
460     }
461 
462     /**
463      * Performs a binary search over a non-root level index block. Utilizes the
464      * secondary index, which records the offsets of (offset, onDiskSize,
465      * firstKey) tuples of all entries.
466      *
467      * @param key the key we are searching for offsets to individual entries in
468      *          the blockIndex buffer
469      * @param keyOffset the offset of the key in its byte array
470      * @param keyLength the length of the key
471      * @param nonRootIndex the non-root index block buffer, starting with the
472      *          secondary index. The position is ignored.
473      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
474      *         keys[i + 1], if keys is the array of all keys being searched, or
475      *         -1 otherwise
476      * @throws IOException
477      */
478     static int binarySearchNonRootIndex(byte[] key, int keyOffset,
479         int keyLength, ByteBuffer nonRootIndex,
480         KVComparator comparator) {
481 
482       int numEntries = nonRootIndex.getInt(0);
483       int low = 0;
484       int high = numEntries - 1;
485       int mid = 0;
486 
487       // Entries start after the number of entries and the secondary index.
488       // The secondary index takes numEntries + 1 ints.
489       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
490 
491       // If we imagine that keys[-1] = -Infinity and
492       // keys[numEntries] = Infinity, then we are maintaining an invariant that
493       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
494 
495       while (low <= high) {
496         mid = (low + high) >>> 1;
497 
498         // Midkey's offset relative to the end of secondary index
499         int midKeyRelOffset = nonRootIndex.getInt(
500             Bytes.SIZEOF_INT * (mid + 1));
501 
502         // The offset of the middle key in the blockIndex buffer
503         int midKeyOffset = entriesOffset       // Skip secondary index
504             + midKeyRelOffset                  // Skip all entries until mid
505             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
506 
507         // We subtract the two consecutive secondary index elements, which
508         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
509         // then need to subtract the overhead of offset and onDiskSize.
510         int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
511             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
512 
513         // we have to compare in this order, because the comparator order
514         // has special logic when the 'left side' is a special key.
515         int cmp = comparator.compareFlatKey(key, keyOffset, keyLength,
516             nonRootIndex.array(), nonRootIndex.arrayOffset() + midKeyOffset,
517             midLength);
518 
519         // key lives above the midpoint
520         if (cmp > 0)
521           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
522         // key lives below the midpoint
523         else if (cmp < 0)
524           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
525         else
526           return mid; // exact match
527       }
528 
529       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
530       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
531       // condition, low >= high + 1. Therefore, low = high + 1.
532 
533       if (low != high + 1) {
534         throw new IllegalStateException("Binary search broken: low=" + low
535             + " " + "instead of " + (high + 1));
536       }
537 
538       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
539       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
540       int i = low - 1;
541 
542       // Some extra validation on the result.
543       if (i < -1 || i >= numEntries) {
544         throw new IllegalStateException("Binary search broken: result is " +
545             i + " but expected to be between -1 and (numEntries - 1) = " +
546             (numEntries - 1));
547       }
548 
549       return i;
550     }
551 
552     /**
553      * Search for one key using the secondary index in a non-root block. In case
554      * of success, positions the provided buffer at the entry of interest, where
555      * the file offset and the on-disk-size can be read.
556      *
557      * @param nonRootBlock a non-root block without header. Initial position
558      *          does not matter.
559      * @param key the byte array containing the key
560      * @param keyOffset the offset of the key in its byte array
561      * @param keyLength the length of the key
562      * @return the index position where the given key was found,
563      *         otherwise return -1 in the case the given key is before the first key.
564      *
565      */
566     static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, byte[] key,
567         int keyOffset, int keyLength, KVComparator comparator) {
568       int entryIndex = binarySearchNonRootIndex(key, keyOffset, keyLength,
569           nonRootBlock, comparator);
570 
571       if (entryIndex != -1) {
572         int numEntries = nonRootBlock.getInt(0);
573 
574         // The end of secondary index and the beginning of entries themselves.
575         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
576 
577         // The offset of the entry we are interested in relative to the end of
578         // the secondary index.
579         int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT
580             * (1 + entryIndex));
581 
582         nonRootBlock.position(entriesOffset + entryRelOffset);
583       }
584 
585       return entryIndex;
586     }
587 
588     /**
589      * Read in the root-level index from the given input stream. Must match
590      * what was written into the root level by
591      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
592      * offset that function returned.
593      *
594      * @param in the buffered input stream or wrapped byte input stream
595      * @param numEntries the number of root-level index entries
596      * @throws IOException
597      */
598     public void readRootIndex(DataInput in, final int numEntries)
599         throws IOException {
600       blockOffsets = new long[numEntries];
601       blockKeys = new byte[numEntries][];
602       blockDataSizes = new int[numEntries];
603 
604       // If index size is zero, no index was written.
605       if (numEntries > 0) {
606         for (int i = 0; i < numEntries; ++i) {
607           long offset = in.readLong();
608           int dataSize = in.readInt();
609           byte[] key = Bytes.readByteArray(in);
610           add(key, offset, dataSize);
611         }
612       }
613     }
614     
615     /**
616      * Read in the root-level index from the given input stream. Must match
617      * what was written into the root level by
618      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
619      * offset that function returned.
620      *
621      * @param blk the HFile block
622      * @param numEntries the number of root-level index entries
623      * @return the buffered input stream or wrapped byte input stream
624      * @throws IOException
625      */
626     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
627       DataInputStream in = blk.getByteStream();
628       readRootIndex(in, numEntries);
629       return in;
630     }
631 
632     /**
633      * Read the root-level metadata of a multi-level block index. Based on
634      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
635      * necessary to compute the mid-key in a multi-level index.
636      *
637      * @param blk the HFile block
638      * @param numEntries the number of root-level index entries
639      * @throws IOException
640      */
641     public void readMultiLevelIndexRoot(HFileBlock blk,
642         final int numEntries) throws IOException {
643       DataInputStream in = readRootIndex(blk, numEntries);
644       // after reading the root index the checksum bytes have to
645       // be subtracted to know if the mid key exists.
646       int checkSumBytes = blk.totalChecksumBytes();
647       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
648         // No mid-key metadata available.
649         return;
650       }
651       midLeafBlockOffset = in.readLong();
652       midLeafBlockOnDiskSize = in.readInt();
653       midKeyEntry = in.readInt();
654     }
655 
656     @Override
657     public String toString() {
658       StringBuilder sb = new StringBuilder();
659       sb.append("size=" + rootCount).append("\n");
660       for (int i = 0; i < rootCount; i++) {
661         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
662             .append("\n  offset=").append(blockOffsets[i])
663             .append(", dataSize=" + blockDataSizes[i]).append("\n");
664       }
665       return sb.toString();
666     }
667 
668     @Override
669     public long heapSize() {
670       long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
671           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
672 
673       // Mid-key metadata.
674       heapSize += MID_KEY_METADATA_SIZE;
675 
676       // Calculating the size of blockKeys
677       if (blockKeys != null) {
678         // Adding array + references overhead
679         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
680             * ClassSize.REFERENCE);
681 
682         // Adding bytes
683         for (byte[] key : blockKeys) {
684           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
685         }
686       }
687 
688       if (blockOffsets != null) {
689         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
690             * Bytes.SIZEOF_LONG);
691       }
692 
693       if (blockDataSizes != null) {
694         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
695             * Bytes.SIZEOF_INT);
696       }
697 
698       return ClassSize.align(heapSize);
699     }
700 
701   }
702 
703   /**
704    * Writes the block index into the output stream. Generate the tree from
705    * bottom up. The leaf level is written to disk as a sequence of inline
706    * blocks, if it is larger than a certain number of bytes. If the leaf level
707    * is not large enough, we write all entries to the root level instead.
708    *
709    * After all leaf blocks have been written, we end up with an index
710    * referencing the resulting leaf index blocks. If that index is larger than
711    * the allowed root index size, the writer will break it up into
712    * reasonable-size intermediate-level index block chunks write those chunks
713    * out, and create another index referencing those chunks. This will be
714    * repeated until the remaining index is small enough to become the root
715    * index. However, in most practical cases we will only have leaf-level
716    * blocks and the root index, or just the root index.
717    */
718   public static class BlockIndexWriter implements InlineBlockWriter {
719     /**
720      * While the index is being written, this represents the current block
721      * index referencing all leaf blocks, with one exception. If the file is
722      * being closed and there are not enough blocks to complete even a single
723      * leaf block, no leaf blocks get written and this contains the entire
724      * block index. After all levels of the index were written by
725      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
726      * root-level index.
727      */
728     private BlockIndexChunk rootChunk = new BlockIndexChunk();
729 
730     /**
731      * Current leaf-level chunk. New entries referencing data blocks get added
732      * to this chunk until it grows large enough to be written to disk.
733      */
734     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
735 
736     /**
737      * The number of block index levels. This is one if there is only root
738      * level (even empty), two if there a leaf level and root level, and is
739      * higher if there are intermediate levels. This is only final after
740      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
741      * initial value accounts for the root level, and will be increased to two
742      * as soon as we find out there is a leaf-level in
743      * {@link #blockWritten(long, int, int)}.
744      */
745     private int numLevels = 1;
746 
747     private HFileBlock.Writer blockWriter;
748     private byte[] firstKey = null;
749 
750     /**
751      * The total number of leaf-level entries, i.e. entries referenced by
752      * leaf-level blocks. For the data block index this is equal to the number
753      * of data blocks.
754      */
755     private long totalNumEntries;
756 
757     /** Total compressed size of all index blocks. */
758     private long totalBlockOnDiskSize;
759 
760     /** Total uncompressed size of all index blocks. */
761     private long totalBlockUncompressedSize;
762 
763     /** The maximum size guideline of all multi-level index blocks. */
764     private int maxChunkSize;
765 
766     /** Whether we require this block index to always be single-level. */
767     private boolean singleLevelOnly;
768 
769     /** CacheConfig, or null if cache-on-write is disabled */
770     private CacheConfig cacheConf;
771 
772     /** Name to use for computing cache keys */
773     private String nameForCaching;
774 
775     /** Creates a single-level block index writer */
776     public BlockIndexWriter() {
777       this(null, null, null);
778       singleLevelOnly = true;
779     }
780 
781     /**
782      * Creates a multi-level block index writer.
783      *
784      * @param blockWriter the block writer to use to write index blocks
785      * @param cacheConf used to determine when and how a block should be cached-on-write.
786      */
787     public BlockIndexWriter(HFileBlock.Writer blockWriter,
788         CacheConfig cacheConf, String nameForCaching) {
789       if ((cacheConf == null) != (nameForCaching == null)) {
790         throw new IllegalArgumentException("Block cache and file name for " +
791             "caching must be both specified or both null");
792       }
793 
794       this.blockWriter = blockWriter;
795       this.cacheConf = cacheConf;
796       this.nameForCaching = nameForCaching;
797       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
798     }
799 
800     public void setMaxChunkSize(int maxChunkSize) {
801       if (maxChunkSize <= 0) {
802         throw new IllegalArgumentException("Invald maximum index block size");
803       }
804       this.maxChunkSize = maxChunkSize;
805     }
806 
807     /**
808      * Writes the root level and intermediate levels of the block index into
809      * the output stream, generating the tree from bottom up. Assumes that the
810      * leaf level has been inline-written to the disk if there is enough data
811      * for more than one leaf block. We iterate by breaking the current level
812      * of the block index, starting with the index of all leaf-level blocks,
813      * into chunks small enough to be written to disk, and generate its parent
814      * level, until we end up with a level small enough to become the root
815      * level.
816      *
817      * If the leaf level is not large enough, there is no inline block index
818      * anymore, so we only write that level of block index to disk as the root
819      * level.
820      *
821      * @param out FSDataOutputStream
822      * @return position at which we entered the root-level index.
823      * @throws IOException
824      */
825     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
826       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
827         throw new IOException("Trying to write a multi-level block index, " +
828             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
829             "last inline chunk.");
830       }
831 
832       // We need to get mid-key metadata before we create intermediate
833       // indexes and overwrite the root chunk.
834       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
835           : null;
836 
837       if (curInlineChunk != null) {
838         while (rootChunk.getRootSize() > maxChunkSize) {
839           rootChunk = writeIntermediateLevel(out, rootChunk);
840           numLevels += 1;
841         }
842       }
843 
844       // write the root level
845       long rootLevelIndexPos = out.getPos();
846 
847       {
848         DataOutput blockStream =
849             blockWriter.startWriting(BlockType.ROOT_INDEX);
850         rootChunk.writeRoot(blockStream);
851         if (midKeyMetadata != null)
852           blockStream.write(midKeyMetadata);
853         blockWriter.writeHeaderAndData(out);
854       }
855 
856       // Add root index block size
857       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
858       totalBlockUncompressedSize +=
859           blockWriter.getUncompressedSizeWithoutHeader();
860 
861       if (LOG.isTraceEnabled()) {
862         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
863           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
864           + " root-level entries, " + totalNumEntries + " total entries, "
865           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
866           " on-disk size, "
867           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
868           " total uncompressed size.");
869       }
870       return rootLevelIndexPos;
871     }
872 
873     /**
874      * Writes the block index data as a single level only. Does not do any
875      * block framing.
876      *
877      * @param out the buffered output stream to write the index to. Typically a
878      *          stream writing into an {@link HFile} block.
879      * @param description a short description of the index being written. Used
880      *          in a log message.
881      * @throws IOException
882      */
883     public void writeSingleLevelIndex(DataOutput out, String description)
884         throws IOException {
885       expectNumLevels(1);
886 
887       if (!singleLevelOnly)
888         throw new IOException("Single-level mode is turned off");
889 
890       if (rootChunk.getNumEntries() > 0)
891         throw new IOException("Root-level entries already added in " +
892             "single-level mode");
893 
894       rootChunk = curInlineChunk;
895       curInlineChunk = new BlockIndexChunk();
896 
897       if (LOG.isTraceEnabled()) {
898         LOG.trace("Wrote a single-level " + description + " index with "
899           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
900           + " bytes");
901       }
902       rootChunk.writeRoot(out);
903     }
904 
905     /**
906      * Split the current level of the block index into intermediate index
907      * blocks of permitted size and write those blocks to disk. Return the next
908      * level of the block index referencing those intermediate-level blocks.
909      *
910      * @param out
911      * @param currentLevel the current level of the block index, such as the a
912      *          chunk referencing all leaf-level index blocks
913      * @return the parent level block index, which becomes the root index after
914      *         a few (usually zero) iterations
915      * @throws IOException
916      */
917     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
918         BlockIndexChunk currentLevel) throws IOException {
919       // Entries referencing intermediate-level blocks we are about to create.
920       BlockIndexChunk parent = new BlockIndexChunk();
921 
922       // The current intermediate-level block index chunk.
923       BlockIndexChunk curChunk = new BlockIndexChunk();
924 
925       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
926         curChunk.add(currentLevel.getBlockKey(i),
927             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
928 
929         if (curChunk.getRootSize() >= maxChunkSize)
930           writeIntermediateBlock(out, parent, curChunk);
931       }
932 
933       if (curChunk.getNumEntries() > 0) {
934         writeIntermediateBlock(out, parent, curChunk);
935       }
936 
937       return parent;
938     }
939 
940     private void writeIntermediateBlock(FSDataOutputStream out,
941         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
942       long beginOffset = out.getPos();
943       DataOutputStream dos = blockWriter.startWriting(
944           BlockType.INTERMEDIATE_INDEX);
945       curChunk.writeNonRoot(dos);
946       byte[] curFirstKey = curChunk.getBlockKey(0);
947       blockWriter.writeHeaderAndData(out);
948 
949       if (cacheConf != null) {
950         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
951         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
952           beginOffset, DataBlockEncoding.NONE,
953           blockForCaching.getBlockType()), blockForCaching);
954       }
955 
956       // Add intermediate index block size
957       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
958       totalBlockUncompressedSize +=
959           blockWriter.getUncompressedSizeWithoutHeader();
960 
961       // OFFSET is the beginning offset the chunk of block index entries.
962       // SIZE is the total byte size of the chunk of block index entries
963       // + the secondary index size
964       // FIRST_KEY is the first key in the chunk of block index
965       // entries.
966       parent.add(curFirstKey, beginOffset,
967           blockWriter.getOnDiskSizeWithHeader());
968 
969       // clear current block index chunk
970       curChunk.clear();
971       curFirstKey = null;
972     }
973 
974     /**
975      * @return how many block index entries there are in the root level
976      */
977     public final int getNumRootEntries() {
978       return rootChunk.getNumEntries();
979     }
980 
981     /**
982      * @return the number of levels in this block index.
983      */
984     public int getNumLevels() {
985       return numLevels;
986     }
987 
988     private void expectNumLevels(int expectedNumLevels) {
989       if (numLevels != expectedNumLevels) {
990         throw new IllegalStateException("Number of block index levels is "
991             + numLevels + "but is expected to be " + expectedNumLevels);
992       }
993     }
994 
995     /**
996      * Whether there is an inline block ready to be written. In general, we
997      * write an leaf-level index block as an inline block as soon as its size
998      * as serialized in the non-root format reaches a certain threshold.
999      */
1000     @Override
1001     public boolean shouldWriteBlock(boolean closing) {
1002       if (singleLevelOnly) {
1003         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1004       }
1005 
1006       if (curInlineChunk == null) {
1007         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1008             "called with closing=true and then called again?");
1009       }
1010 
1011       if (curInlineChunk.getNumEntries() == 0) {
1012         return false;
1013       }
1014 
1015       // We do have some entries in the current inline chunk.
1016       if (closing) {
1017         if (rootChunk.getNumEntries() == 0) {
1018           // We did not add any leaf-level blocks yet. Instead of creating a
1019           // leaf level with one block, move these entries to the root level.
1020 
1021           expectNumLevels(1);
1022           rootChunk = curInlineChunk;
1023           curInlineChunk = null;  // Disallow adding any more index entries.
1024           return false;
1025         }
1026 
1027         return true;
1028       } else {
1029         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1030       }
1031     }
1032 
1033     /**
1034      * Write out the current inline index block. Inline blocks are non-root
1035      * blocks, so the non-root index format is used.
1036      *
1037      * @param out
1038      */
1039     @Override
1040     public void writeInlineBlock(DataOutput out) throws IOException {
1041       if (singleLevelOnly)
1042         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1043 
1044       // Write the inline block index to the output stream in the non-root
1045       // index block format.
1046       curInlineChunk.writeNonRoot(out);
1047 
1048       // Save the first key of the inline block so that we can add it to the
1049       // parent-level index.
1050       firstKey = curInlineChunk.getBlockKey(0);
1051 
1052       // Start a new inline index block
1053       curInlineChunk.clear();
1054     }
1055 
1056     /**
1057      * Called after an inline block has been written so that we can add an
1058      * entry referring to that block to the parent-level index.
1059      */
1060     @Override
1061     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1062       // Add leaf index block size
1063       totalBlockOnDiskSize += onDiskSize;
1064       totalBlockUncompressedSize += uncompressedSize;
1065 
1066       if (singleLevelOnly)
1067         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1068 
1069       if (firstKey == null) {
1070         throw new IllegalStateException("Trying to add second-level index " +
1071             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1072             "but the first key was not set in writeInlineBlock");
1073       }
1074 
1075       if (rootChunk.getNumEntries() == 0) {
1076         // We are writing the first leaf block, so increase index level.
1077         expectNumLevels(1);
1078         numLevels = 2;
1079       }
1080 
1081       // Add another entry to the second-level index. Include the number of
1082       // entries in all previous leaf-level chunks for mid-key calculation.
1083       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1084       firstKey = null;
1085     }
1086 
1087     @Override
1088     public BlockType getInlineBlockType() {
1089       return BlockType.LEAF_INDEX;
1090     }
1091 
1092     /**
1093      * Add one index entry to the current leaf-level block. When the leaf-level
1094      * block gets large enough, it will be flushed to disk as an inline block.
1095      *
1096      * @param firstKey the first key of the data block
1097      * @param blockOffset the offset of the data block
1098      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1099      *          format version 2), or the uncompressed size of the data block (
1100      *          {@link HFile} format version 1).
1101      */
1102     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize)
1103     {
1104       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1105       ++totalNumEntries;
1106     }
1107 
1108     /**
1109      * @throws IOException if we happened to write a multi-level index.
1110      */
1111     public void ensureSingleLevel() throws IOException {
1112       if (numLevels > 1) {
1113         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1114             rootChunk.getNumEntries() + " root-level entries, but " +
1115             "this is expected to be a single-level block index.");
1116       }
1117     }
1118 
1119     /**
1120      * @return true if we are using cache-on-write. This is configured by the
1121      *         caller of the constructor by either passing a valid block cache
1122      *         or null.
1123      */
1124     @Override
1125     public boolean getCacheOnWrite() {
1126       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1127     }
1128 
1129     /**
1130      * The total uncompressed size of the root index block, intermediate-level
1131      * index blocks, and leaf-level index blocks.
1132      *
1133      * @return the total uncompressed size of all index blocks
1134      */
1135     public long getTotalUncompressedSize() {
1136       return totalBlockUncompressedSize;
1137     }
1138 
1139   }
1140 
1141   /**
1142    * A single chunk of the block index in the process of writing. The data in
1143    * this chunk can become a leaf-level, intermediate-level, or root index
1144    * block.
1145    */
1146   static class BlockIndexChunk {
1147 
1148     /** First keys of the key range corresponding to each index entry. */
1149     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1150 
1151     /** Block offset in backing stream. */
1152     private final List<Long> blockOffsets = new ArrayList<Long>();
1153 
1154     /** On-disk data sizes of lower-level data or index blocks. */
1155     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1156 
1157     /**
1158      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1159      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1160      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1161      */
1162     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1163 
1164     /**
1165      * The offset of the next entry to be added, relative to the end of the
1166      * "secondary index" in the "non-root" format representation of this index
1167      * chunk. This is the next value to be added to the secondary index.
1168      */
1169     private int curTotalNonRootEntrySize = 0;
1170 
1171     /**
1172      * The accumulated size of this chunk if stored in the root index format.
1173      */
1174     private int curTotalRootSize = 0;
1175 
1176     /**
1177      * The "secondary index" used for binary search over variable-length
1178      * records in a "non-root" format block. These offsets are relative to the
1179      * end of this secondary index.
1180      */
1181     private final List<Integer> secondaryIndexOffsetMarks =
1182         new ArrayList<Integer>();
1183 
1184     /**
1185      * Adds a new entry to this block index chunk.
1186      *
1187      * @param firstKey the first key in the block pointed to by this entry
1188      * @param blockOffset the offset of the next-level block pointed to by this
1189      *          entry
1190      * @param onDiskDataSize the on-disk data of the block pointed to by this
1191      *          entry, including header size
1192      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1193      *          construction, this specifies the current total number of
1194      *          sub-entries in all leaf-level chunks, including the one
1195      *          corresponding to the second-level entry being added.
1196      */
1197     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1198         long curTotalNumSubEntries) {
1199       // Record the offset for the secondary index
1200       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1201       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1202           + firstKey.length;
1203 
1204       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1205           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1206 
1207       blockKeys.add(firstKey);
1208       blockOffsets.add(blockOffset);
1209       onDiskDataSizes.add(onDiskDataSize);
1210 
1211       if (curTotalNumSubEntries != -1) {
1212         numSubEntriesAt.add(curTotalNumSubEntries);
1213 
1214         // Make sure the parallel arrays are in sync.
1215         if (numSubEntriesAt.size() != blockKeys.size()) {
1216           throw new IllegalStateException("Only have key/value count " +
1217               "stats for " + numSubEntriesAt.size() + " block index " +
1218               "entries out of " + blockKeys.size());
1219         }
1220       }
1221     }
1222 
1223     /**
1224      * The same as {@link #add(byte[], long, int, long)} but does not take the
1225      * key/value into account. Used for single-level indexes.
1226      *
1227      * @see {@link #add(byte[], long, int, long)}
1228      */
1229     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1230       add(firstKey, blockOffset, onDiskDataSize, -1);
1231     }
1232 
1233     public void clear() {
1234       blockKeys.clear();
1235       blockOffsets.clear();
1236       onDiskDataSizes.clear();
1237       secondaryIndexOffsetMarks.clear();
1238       numSubEntriesAt.clear();
1239       curTotalNonRootEntrySize = 0;
1240       curTotalRootSize = 0;
1241     }
1242 
1243     /**
1244      * Finds the entry corresponding to the deeper-level index block containing
1245      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1246      * ordering of sub-entries.
1247      *
1248      * <p>
1249      * <i> Implementation note. </i> We are looking for i such that
1250      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1251      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1252      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1253      * sub-entries. i is by definition the insertion point of k in
1254      * numSubEntriesAt.
1255      *
1256      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1257      * @return the 0-based index of the entry corresponding to the given
1258      *         sub-entry
1259      */
1260     public int getEntryBySubEntry(long k) {
1261       // We define mid-key as the key corresponding to k'th sub-entry
1262       // (0-based).
1263 
1264       int i = Collections.binarySearch(numSubEntriesAt, k);
1265 
1266       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1267       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1268       // is in the (i + 1)'th chunk.
1269       if (i >= 0)
1270         return i + 1;
1271 
1272       // Inexact match. Return the insertion point.
1273       return -i - 1;
1274     }
1275 
1276     /**
1277      * Used when writing the root block index of a multi-level block index.
1278      * Serializes additional information allowing to efficiently identify the
1279      * mid-key.
1280      *
1281      * @return a few serialized fields for finding the mid-key
1282      * @throws IOException if could not create metadata for computing mid-key
1283      */
1284     public byte[] getMidKeyMetadata() throws IOException {
1285       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1286           MID_KEY_METADATA_SIZE);
1287       DataOutputStream baosDos = new DataOutputStream(baos);
1288       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1289       if (totalNumSubEntries == 0) {
1290         throw new IOException("No leaf-level entries, mid-key unavailable");
1291       }
1292       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1293       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1294 
1295       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1296       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1297 
1298       long numSubEntriesBefore = midKeyEntry > 0
1299           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1300       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1301       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1302       {
1303         throw new IOException("Could not identify mid-key index within the "
1304             + "leaf-level block containing mid-key: out of range ("
1305             + subEntryWithinEntry + ", numSubEntriesBefore="
1306             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1307             + ")");
1308       }
1309 
1310       baosDos.writeInt((int) subEntryWithinEntry);
1311 
1312       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1313         throw new IOException("Could not write mid-key metadata: size=" +
1314             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1315       }
1316 
1317       // Close just to be good citizens, although this has no effect.
1318       baos.close();
1319 
1320       return baos.toByteArray();
1321     }
1322 
1323     /**
1324      * Writes the block index chunk in the non-root index block format. This
1325      * format contains the number of entries, an index of integer offsets
1326      * for quick binary search on variable-length records, and tuples of
1327      * block offset, on-disk block size, and the first key for each entry.
1328      *
1329      * @param out
1330      * @throws IOException
1331      */
1332     void writeNonRoot(DataOutput out) throws IOException {
1333       // The number of entries in the block.
1334       out.writeInt(blockKeys.size());
1335 
1336       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1337         throw new IOException("Corrupted block index chunk writer: " +
1338             blockKeys.size() + " entries but " +
1339             secondaryIndexOffsetMarks.size() + " secondary index items");
1340       }
1341 
1342       // For each entry, write a "secondary index" of relative offsets to the
1343       // entries from the end of the secondary index. This works, because at
1344       // read time we read the number of entries and know where the secondary
1345       // index ends.
1346       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1347         out.writeInt(currentSecondaryIndex);
1348 
1349       // We include one other element in the secondary index to calculate the
1350       // size of each entry more easily by subtracting secondary index elements.
1351       out.writeInt(curTotalNonRootEntrySize);
1352 
1353       for (int i = 0; i < blockKeys.size(); ++i) {
1354         out.writeLong(blockOffsets.get(i));
1355         out.writeInt(onDiskDataSizes.get(i));
1356         out.write(blockKeys.get(i));
1357       }
1358     }
1359 
1360     /**
1361      * @return the size of this chunk if stored in the non-root index block
1362      *         format
1363      */
1364     int getNonRootSize() {
1365       return Bytes.SIZEOF_INT                          // Number of entries
1366           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1367           + curTotalNonRootEntrySize;                  // All entries
1368     }
1369 
1370     /**
1371      * Writes this chunk into the given output stream in the root block index
1372      * format. This format is similar to the {@link HFile} version 1 block
1373      * index format, except that we store on-disk size of the block instead of
1374      * its uncompressed size.
1375      *
1376      * @param out the data output stream to write the block index to. Typically
1377      *          a stream writing into an {@link HFile} block.
1378      * @throws IOException
1379      */
1380     void writeRoot(DataOutput out) throws IOException {
1381       for (int i = 0; i < blockKeys.size(); ++i) {
1382         out.writeLong(blockOffsets.get(i));
1383         out.writeInt(onDiskDataSizes.get(i));
1384         Bytes.writeByteArray(out, blockKeys.get(i));
1385       }
1386     }
1387 
1388     /**
1389      * @return the size of this chunk if stored in the root index block format
1390      */
1391     int getRootSize() {
1392       return curTotalRootSize;
1393     }
1394 
1395     /**
1396      * @return the number of entries in this block index chunk
1397      */
1398     public int getNumEntries() {
1399       return blockKeys.size();
1400     }
1401 
1402     public byte[] getBlockKey(int i) {
1403       return blockKeys.get(i);
1404     }
1405 
1406     public long getBlockOffset(int i) {
1407       return blockOffsets.get(i);
1408     }
1409 
1410     public int getOnDiskDataSize(int i) {
1411       return onDiskDataSizes.get(i);
1412     }
1413 
1414     public long getCumulativeNumKV(int i) {
1415       if (i < 0)
1416         return 0;
1417       return numSubEntriesAt.get(i);
1418     }
1419 
1420   }
1421 
1422   public static int getMaxChunkSize(Configuration conf) {
1423     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1424   }
1425 }