1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.DataInput;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.KeyValue.KVComparator;
34 import org.apache.hadoop.hbase.fs.HFileSystem;
35 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
36 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
37 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
38 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
39 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
40 import org.apache.hadoop.hbase.util.ByteBufferUtils;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.IdLock;
43 import org.apache.hadoop.io.WritableUtils;
44 import org.cloudera.htrace.Trace;
45 import org.cloudera.htrace.TraceScope;
46
47 import com.google.common.annotations.VisibleForTesting;
48
49
50
51
52 @InterfaceAudience.Private
53 public class HFileReaderV2 extends AbstractHFileReader {
54
55 private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
56
57
58 public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
59
60 public static final int MINOR_VERSION_NO_CHECKSUM = 0;
61
62
63 public static final int PBUF_TRAILER_MINOR_VERSION = 2;
64
65
66
67
68
69 public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
70
71 protected boolean includesMemstoreTS = false;
72 protected boolean decodeMemstoreTS = false;
73 protected boolean shouldIncludeMemstoreTS() {
74 return includesMemstoreTS;
75 }
76
77
78 protected HFileBlock.FSReader fsBlockReader;
79
80
81
82
83
84
85
86 private IdLock offsetLock = new IdLock();
87
88
89
90
91
92 private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
93
94
95 static final int MIN_MINOR_VERSION = 0;
96
97
98
99
100 static final int MAX_MINOR_VERSION = 3;
101
102
103 static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
104
105 protected HFileContext hfileContext;
106
107
108
109
110
111
112
113
114
115
116
117
118
119 public HFileReaderV2(final Path path, final FixedFileTrailer trailer,
120 final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
121 final HFileSystem hfs, final Configuration conf) throws IOException {
122 super(path, trailer, size, cacheConf, hfs, conf);
123 this.conf = conf;
124 trailer.expectMajorVersion(getMajorVersion());
125 validateMinorVersion(path, trailer.getMinorVersion());
126 this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
127 HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis, fileSize, hfs, path,
128 hfileContext);
129 this.fsBlockReader = fsBlockReaderV2;
130
131
132 comparator = trailer.createComparator();
133 dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
134 trailer.getNumDataIndexLevels(), this);
135 metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
136 KeyValue.RAW_COMPARATOR, 1);
137
138
139
140 HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
141 trailer.getLoadOnOpenDataOffset(),
142 fileSize - trailer.getTrailerSize());
143
144
145
146 dataBlockIndexReader.readMultiLevelIndexRoot(
147 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
148 trailer.getDataIndexCount());
149
150
151 metaBlockIndexReader.readRootIndex(
152 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
153 trailer.getMetaIndexCount());
154
155
156 fileInfo = new FileInfo();
157 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
158 lastKey = fileInfo.get(FileInfo.LASTKEY);
159 avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
160 avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
161 byte [] keyValueFormatVersion =
162 fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
163 includesMemstoreTS = keyValueFormatVersion != null &&
164 Bytes.toInt(keyValueFormatVersion) ==
165 HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
166 fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
167 if (includesMemstoreTS) {
168 decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
169 }
170
171
172 dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
173 fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
174
175
176 HFileBlock b;
177 while ((b = blockIter.nextBlock()) != null) {
178 loadOnOpenBlocks.add(b);
179 }
180
181
182 if (cacheConf.shouldPrefetchOnOpen()) {
183 PrefetchExecutor.request(path, new Runnable() {
184 public void run() {
185 try {
186 long offset = 0;
187 long end = fileSize - getTrailer().getTrailerSize();
188 HFileBlock prevBlock = null;
189 while (offset < end) {
190 if (Thread.interrupted()) {
191 break;
192 }
193 long onDiskSize = -1;
194 if (prevBlock != null) {
195 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
196 }
197 HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, null);
198 prevBlock = block;
199 offset += block.getOnDiskSizeWithHeader();
200 }
201 } catch (IOException e) {
202
203 if (LOG.isTraceEnabled()) {
204 LOG.trace("Exception encountered while prefetching " + path + ":", e);
205 }
206 } catch (Exception e) {
207
208 LOG.warn("Exception encountered while prefetching " + path + ":", e);
209 } finally {
210 PrefetchExecutor.complete(path);
211 }
212 }
213 });
214 }
215 }
216
217 protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
218 HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
219 return new HFileContextBuilder()
220 .withIncludesMvcc(this.includesMemstoreTS)
221 .withCompression(this.compressAlgo)
222 .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM)
223 .build();
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237
238 @Override
239 public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
240 final boolean isCompaction) {
241 if (dataBlockEncoder.useEncodedScanner()) {
242 return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
243 hfileContext);
244 }
245
246 return new ScannerV2(this, cacheBlocks, pread, isCompaction);
247 }
248
249
250
251
252
253
254
255 @Override
256 public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
257 throws IOException {
258 if (trailer.getMetaIndexCount() == 0) {
259 return null;
260 }
261 if (metaBlockIndexReader == null) {
262 throw new IOException("Meta index not loaded");
263 }
264
265 byte[] mbname = Bytes.toBytes(metaBlockName);
266 int block = metaBlockIndexReader.rootBlockContainingKey(mbname, 0,
267 mbname.length);
268 if (block == -1)
269 return null;
270 long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
271
272
273
274
275 synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
276
277 long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
278 BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
279 DataBlockEncoding.NONE, BlockType.META);
280
281 cacheBlock &= cacheConf.shouldCacheDataOnRead();
282 if (cacheConf.isBlockCacheEnabled()) {
283 HFileBlock cachedBlock =
284 (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, false, true);
285 if (cachedBlock != null) {
286 assert cachedBlock.isUnpacked() : "Packed block leak.";
287
288
289 return cachedBlock.getBufferWithoutHeader();
290 }
291
292 }
293
294 HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
295 blockSize, -1, true).unpack(hfileContext, fsBlockReader);
296
297
298 if (cacheBlock) {
299 cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
300 cacheConf.isInMemory());
301 }
302
303 return metaBlock.getBufferWithoutHeader();
304 }
305 }
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322 @Override
323 public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
324 final boolean cacheBlock, boolean pread, final boolean isCompaction,
325 final boolean updateCacheMetrics, BlockType expectedBlockType)
326 throws IOException {
327 if (dataBlockIndexReader == null) {
328 throw new IOException("Block index not loaded");
329 }
330 if (dataBlockOffset < 0
331 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
332 throw new IOException("Requested block is out of range: "
333 + dataBlockOffset + ", lastDataBlockOffset: "
334 + trailer.getLastDataBlockOffset());
335 }
336
337
338
339
340
341
342 BlockCacheKey cacheKey =
343 new BlockCacheKey(name, dataBlockOffset,
344 dataBlockEncoder.getDataBlockEncoding(),
345 expectedBlockType);
346
347 boolean useLock = false;
348 IdLock.Entry lockEntry = null;
349 TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
350 try {
351 while (true) {
352 if (useLock) {
353 lockEntry = offsetLock.getLockEntry(dataBlockOffset);
354 }
355
356
357 if (cacheConf.isBlockCacheEnabled()) {
358
359
360 HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
361 cacheBlock, useLock, updateCacheMetrics);
362 if (cachedBlock != null) {
363 if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
364 cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
365 }
366 if (Trace.isTracing()) {
367 traceScope.getSpan().addTimelineAnnotation("blockCacheHit");
368 }
369 assert cachedBlock.isUnpacked() : "Packed block leak.";
370 if (cachedBlock.getBlockType().isData()) {
371 HFile.dataBlockReadCnt.incrementAndGet();
372
373
374
375 if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
376 throw new IOException("Cached block under key " + cacheKey + " "
377 + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
378 + dataBlockEncoder.getDataBlockEncoding() + ")");
379 }
380 }
381 return cachedBlock;
382 }
383
384 }
385 if (!useLock) {
386
387 useLock = true;
388 continue;
389 }
390 if (Trace.isTracing()) {
391 traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
392 }
393
394 HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
395 pread);
396 validateBlockType(hfileBlock, expectedBlockType);
397 HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
398 BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
399
400
401 if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
402 cacheConf.getBlockCache().cacheBlock(cacheKey,
403 cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
404 cacheConf.isInMemory());
405 }
406
407 if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
408 HFile.dataBlockReadCnt.incrementAndGet();
409 }
410
411 return unpacked;
412 }
413 } finally {
414 traceScope.close();
415 if (lockEntry != null) {
416 offsetLock.releaseLockEntry(lockEntry);
417 }
418 }
419 }
420
421 @Override
422 public boolean hasMVCCInfo() {
423 return includesMemstoreTS && decodeMemstoreTS;
424 }
425
426
427
428
429
430
431
432
433
434
435 private void validateBlockType(HFileBlock block,
436 BlockType expectedBlockType) throws IOException {
437 if (expectedBlockType == null) {
438 return;
439 }
440 BlockType actualBlockType = block.getBlockType();
441 if (actualBlockType == BlockType.ENCODED_DATA &&
442 expectedBlockType == BlockType.DATA) {
443
444
445 return;
446 }
447 if (actualBlockType != expectedBlockType) {
448 throw new IOException("Expected block type " + expectedBlockType + ", " +
449 "but got " + actualBlockType + ": " + block);
450 }
451 }
452
453
454
455
456
457
458 @Override
459 public byte[] getLastKey() {
460 return dataBlockIndexReader.isEmpty() ? null : lastKey;
461 }
462
463
464
465
466
467
468 @Override
469 public byte[] midkey() throws IOException {
470 return dataBlockIndexReader.midkey();
471 }
472
473 @Override
474 public void close() throws IOException {
475 close(cacheConf.shouldEvictOnClose());
476 }
477
478 public void close(boolean evictOnClose) throws IOException {
479 PrefetchExecutor.cancel(path);
480 if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
481 int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
482 if (LOG.isTraceEnabled()) {
483 LOG.trace("On close, file=" + name + " evicted=" + numEvicted
484 + " block(s)");
485 }
486 }
487 fsBlockReader.closeStreams();
488 }
489
490
491 @Override
492 HFileBlock.FSReader getUncachedBlockReader() {
493 return fsBlockReader;
494 }
495
496
497 protected abstract static class AbstractScannerV2
498 extends AbstractHFileReader.Scanner {
499 protected HFileBlock block;
500
501 @Override
502 public byte[] getNextIndexedKey() {
503 return nextIndexedKey;
504 }
505
506
507
508
509
510
511
512 protected byte[] nextIndexedKey;
513
514 public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
515 final boolean pread, final boolean isCompaction) {
516 super(r, cacheBlocks, pread, isCompaction);
517 }
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535 protected int seekTo(byte[] key, int offset, int length, boolean rewind)
536 throws IOException {
537 HFileBlockIndex.BlockIndexReader indexReader =
538 reader.getDataBlockIndexReader();
539 BlockWithScanInfo blockWithScanInfo =
540 indexReader.loadDataBlockWithScanInfo(key, offset, length, block,
541 cacheBlocks, pread, isCompaction);
542 if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
543
544 return -1;
545 }
546 return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
547 blockWithScanInfo.getNextIndexedKey(), rewind, key, offset, length, false);
548 }
549
550 protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
551
552 protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
553 boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
554 throws IOException;
555
556 @Override
557 public int seekTo(byte[] key, int offset, int length) throws IOException {
558
559
560 return seekTo(key, offset, length, true);
561 }
562
563 @Override
564 public int reseekTo(byte[] key, int offset, int length) throws IOException {
565 int compared;
566 if (isSeeked()) {
567 compared = compareKey(reader.getComparator(), key, offset, length);
568 if (compared < 1) {
569
570
571 return compared;
572 } else {
573 if (this.nextIndexedKey != null &&
574 (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
575 reader.getComparator().compareFlatKey(key, offset, length,
576 nextIndexedKey, 0, nextIndexedKey.length) < 0)) {
577
578
579
580 return loadBlockAndSeekToKey(this.block, this.nextIndexedKey,
581 false, key, offset, length, false);
582 }
583 }
584 }
585
586
587 return seekTo(key, offset, length, false);
588 }
589
590 @Override
591 public boolean seekBefore(byte[] key, int offset, int length)
592 throws IOException {
593 HFileBlock seekToBlock =
594 reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
595 block, cacheBlocks, pread, isCompaction);
596 if (seekToBlock == null) {
597 return false;
598 }
599 ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
600
601 if (reader.getComparator().compareFlatKey(firstKey.array(),
602 firstKey.arrayOffset(), firstKey.limit(), key, offset, length) >= 0)
603 {
604 long previousBlockOffset = seekToBlock.getPrevBlockOffset();
605
606 if (previousBlockOffset == -1) {
607
608 return false;
609 }
610
611
612
613
614 seekToBlock = reader.readBlock(previousBlockOffset,
615 seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
616 pread, isCompaction, true, BlockType.DATA);
617
618
619 }
620 byte[] firstKeyInCurrentBlock = Bytes.getBytes(firstKey);
621 loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, offset, length, true);
622 return true;
623 }
624
625
626
627
628
629
630
631
632
633 protected HFileBlock readNextDataBlock() throws IOException {
634 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
635 if (block == null)
636 return null;
637
638 HFileBlock curBlock = block;
639
640 do {
641 if (curBlock.getOffset() >= lastDataBlockOffset)
642 return null;
643
644 if (curBlock.getOffset() < 0) {
645 throw new IOException("Invalid block file offset: " + block);
646 }
647
648
649
650 curBlock = reader.readBlock(curBlock.getOffset()
651 + curBlock.getOnDiskSizeWithHeader(),
652 curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
653 isCompaction, true, null);
654 } while (!curBlock.getBlockType().isData());
655
656 return curBlock;
657 }
658
659
660
661
662
663
664
665
666 public abstract int compareKey(KVComparator comparator, byte[] key, int offset,
667 int length);
668 }
669
670
671
672
673 protected static class ScannerV2 extends AbstractScannerV2 {
674 private HFileReaderV2 reader;
675
676 public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
677 final boolean pread, final boolean isCompaction) {
678 super(r, cacheBlocks, pread, isCompaction);
679 this.reader = r;
680 }
681
682 @Override
683 public KeyValue getKeyValue() {
684 if (!isSeeked())
685 return null;
686
687 KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
688 + blockBuffer.position(), getCellBufSize());
689 if (this.reader.shouldIncludeMemstoreTS()) {
690 ret.setMvccVersion(currMemstoreTS);
691 }
692 return ret;
693 }
694
695 protected int getCellBufSize() {
696 return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
697 }
698
699 @Override
700 public ByteBuffer getKey() {
701 assertSeeked();
702 return ByteBuffer.wrap(
703 blockBuffer.array(),
704 blockBuffer.arrayOffset() + blockBuffer.position()
705 + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
706 }
707
708 @Override
709 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
710 return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
711 blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
712 }
713
714 @Override
715 public ByteBuffer getValue() {
716 assertSeeked();
717 return ByteBuffer.wrap(
718 blockBuffer.array(),
719 blockBuffer.arrayOffset() + blockBuffer.position()
720 + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
721 }
722
723 protected void setNonSeekedState() {
724 block = null;
725 blockBuffer = null;
726 currKeyLen = 0;
727 currValueLen = 0;
728 currMemstoreTS = 0;
729 currMemstoreTSLen = 0;
730 }
731
732
733
734
735
736
737
738
739 @Override
740 public boolean next() throws IOException {
741 assertSeeked();
742
743 try {
744 blockBuffer.position(getNextCellStartPosition());
745 } catch (IllegalArgumentException e) {
746 LOG.error("Current pos = " + blockBuffer.position()
747 + "; currKeyLen = " + currKeyLen + "; currValLen = "
748 + currValueLen + "; block limit = " + blockBuffer.limit()
749 + "; HFile name = " + reader.getName()
750 + "; currBlock currBlockOffset = " + block.getOffset());
751 throw e;
752 }
753
754 if (blockBuffer.remaining() <= 0) {
755 long lastDataBlockOffset =
756 reader.getTrailer().getLastDataBlockOffset();
757
758 if (block.getOffset() >= lastDataBlockOffset) {
759 setNonSeekedState();
760 return false;
761 }
762
763
764 HFileBlock nextBlock = readNextDataBlock();
765 if (nextBlock == null) {
766 setNonSeekedState();
767 return false;
768 }
769
770 updateCurrBlock(nextBlock);
771 return true;
772 }
773
774
775 readKeyValueLen();
776 return true;
777 }
778
779 protected int getNextCellStartPosition() {
780 return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
781 + currMemstoreTSLen;
782 }
783
784
785
786
787
788
789
790
791 @Override
792 public boolean seekTo() throws IOException {
793 if (reader == null) {
794 return false;
795 }
796
797 if (reader.getTrailer().getEntryCount() == 0) {
798
799 return false;
800 }
801
802 long firstDataBlockOffset =
803 reader.getTrailer().getFirstDataBlockOffset();
804 if (block != null && block.getOffset() == firstDataBlockOffset) {
805 blockBuffer.rewind();
806 readKeyValueLen();
807 return true;
808 }
809
810 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
811 isCompaction, true, BlockType.DATA);
812 if (block.getOffset() < 0) {
813 throw new IOException("Invalid block offset: " + block.getOffset());
814 }
815 updateCurrBlock(block);
816 return true;
817 }
818
819 @Override
820 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
821 boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
822 throws IOException {
823 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
824 updateCurrBlock(seekToBlock);
825 } else if (rewind) {
826 blockBuffer.rewind();
827 }
828
829
830 this.nextIndexedKey = nextIndexedKey;
831 return blockSeek(key, offset, length, seekBefore);
832 }
833
834
835
836
837
838
839
840 protected void updateCurrBlock(HFileBlock newBlock) {
841 block = newBlock;
842
843
844 if (block.getBlockType() != BlockType.DATA) {
845 throw new IllegalStateException("ScannerV2 works only on data " +
846 "blocks, got " + block.getBlockType() + "; " +
847 "fileName=" + reader.name + ", " +
848 "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
849 "isCompaction=" + isCompaction);
850 }
851
852 blockBuffer = block.getBufferWithoutHeader();
853 readKeyValueLen();
854 blockFetches++;
855
856
857 this.nextIndexedKey = null;
858 }
859
860 protected void readKeyValueLen() {
861 blockBuffer.mark();
862 currKeyLen = blockBuffer.getInt();
863 currValueLen = blockBuffer.getInt();
864 ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
865 readMvccVersion();
866 if (currKeyLen < 0 || currValueLen < 0
867 || currKeyLen > blockBuffer.limit()
868 || currValueLen > blockBuffer.limit()) {
869 throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
870 + " or currValueLen " + currValueLen + ". Block offset: "
871 + block.getOffset() + ", block length: " + blockBuffer.limit()
872 + ", position: " + blockBuffer.position() + " (without header).");
873 }
874 blockBuffer.reset();
875 }
876
877 protected void readMvccVersion() {
878 if (this.reader.shouldIncludeMemstoreTS()) {
879 if (this.reader.decodeMemstoreTS) {
880 currMemstoreTS = Bytes.readAsVLong(blockBuffer.array(), blockBuffer.arrayOffset()
881 + blockBuffer.position());
882 currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
883 } else {
884 currMemstoreTS = 0;
885 currMemstoreTSLen = 1;
886 }
887 }
888 }
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905 protected int blockSeek(byte[] key, int offset, int length,
906 boolean seekBefore) {
907 int klen, vlen;
908 long memstoreTS = 0;
909 int memstoreTSLen = 0;
910 int lastKeyValueSize = -1;
911 do {
912 blockBuffer.mark();
913 klen = blockBuffer.getInt();
914 vlen = blockBuffer.getInt();
915 blockBuffer.reset();
916 if (this.reader.shouldIncludeMemstoreTS()) {
917 if (this.reader.decodeMemstoreTS) {
918 int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
919 + KEY_VALUE_LEN_SIZE + klen + vlen;
920 memstoreTS = Bytes.readAsVLong(blockBuffer.array(), memstoreTSOffset);
921 memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
922 } else {
923 memstoreTS = 0;
924 memstoreTSLen = 1;
925 }
926 }
927
928 int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
929 + KEY_VALUE_LEN_SIZE;
930 int comp = reader.getComparator().compareFlatKey(key, offset, length,
931 blockBuffer.array(), keyOffset, klen);
932
933 if (comp == 0) {
934 if (seekBefore) {
935 if (lastKeyValueSize < 0) {
936 throw new IllegalStateException("blockSeek with seekBefore "
937 + "at the first key of the block: key="
938 + Bytes.toStringBinary(key) + ", blockOffset="
939 + block.getOffset() + ", onDiskSize="
940 + block.getOnDiskSizeWithHeader());
941 }
942 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
943 readKeyValueLen();
944 return 1;
945 }
946 currKeyLen = klen;
947 currValueLen = vlen;
948 if (this.reader.shouldIncludeMemstoreTS()) {
949 currMemstoreTS = memstoreTS;
950 currMemstoreTSLen = memstoreTSLen;
951 }
952 return 0;
953 } else if (comp < 0) {
954 if (lastKeyValueSize > 0)
955 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
956 readKeyValueLen();
957 if (lastKeyValueSize == -1 && blockBuffer.position() == 0
958 && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
959 return HConstants.INDEX_KEY_MAGIC;
960 }
961 return 1;
962 }
963
964
965 lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
966 blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
967 } while (blockBuffer.remaining() > 0);
968
969
970
971
972 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
973 readKeyValueLen();
974 return 1;
975 }
976
977 @Override
978 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
979 ByteBuffer buffer = curBlock.getBufferWithoutHeader();
980
981 buffer.rewind();
982 int klen = buffer.getInt();
983 buffer.getInt();
984 ByteBuffer keyBuff = buffer.slice();
985 keyBuff.limit(klen);
986 keyBuff.rewind();
987 return keyBuff;
988 }
989
990 @Override
991 public String getKeyString() {
992 return Bytes.toStringBinary(blockBuffer.array(),
993 blockBuffer.arrayOffset() + blockBuffer.position()
994 + KEY_VALUE_LEN_SIZE, currKeyLen);
995 }
996
997 @Override
998 public String getValueString() {
999 return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
1000 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
1001 currValueLen);
1002 }
1003 }
1004
1005
1006
1007
1008 protected static class EncodedScannerV2 extends AbstractScannerV2 {
1009 private final HFileBlockDecodingContext decodingCtx;
1010 private final DataBlockEncoder.EncodedSeeker seeker;
1011 private final DataBlockEncoder dataBlockEncoder;
1012 protected final HFileContext meta;
1013
1014 public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
1015 boolean pread, boolean isCompaction, HFileContext meta) {
1016 super(reader, cacheBlocks, pread, isCompaction);
1017 DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding();
1018 dataBlockEncoder = encoding.getEncoder();
1019 decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
1020 seeker = dataBlockEncoder.createSeeker(
1021 reader.getComparator(), decodingCtx);
1022 this.meta = meta;
1023 }
1024
1025 @Override
1026 public boolean isSeeked(){
1027 return this.block != null;
1028 }
1029
1030
1031
1032
1033
1034
1035
1036
1037 private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
1038 block = newBlock;
1039
1040
1041 if (block.getBlockType() != BlockType.ENCODED_DATA) {
1042 throw new IllegalStateException(
1043 "EncodedScanner works only on encoded data blocks");
1044 }
1045 short dataBlockEncoderId = block.getDataBlockEncodingId();
1046 if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
1047 String encoderCls = dataBlockEncoder.getClass().getName();
1048 throw new CorruptHFileException("Encoder " + encoderCls
1049 + " doesn't support data block encoding "
1050 + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
1051 }
1052
1053 seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
1054 blockFetches++;
1055
1056
1057 this.nextIndexedKey = null;
1058 }
1059
1060 private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
1061 ByteBuffer origBlock = newBlock.getBufferReadOnly();
1062 ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
1063 origBlock.arrayOffset() + newBlock.headerSize() +
1064 DataBlockEncoding.ID_SIZE,
1065 newBlock.getUncompressedSizeWithoutHeader() -
1066 DataBlockEncoding.ID_SIZE).slice();
1067 return encodedBlock;
1068 }
1069
1070 @Override
1071 public boolean seekTo() throws IOException {
1072 if (reader == null) {
1073 return false;
1074 }
1075
1076 if (reader.getTrailer().getEntryCount() == 0) {
1077
1078 return false;
1079 }
1080
1081 long firstDataBlockOffset =
1082 reader.getTrailer().getFirstDataBlockOffset();
1083 if (block != null && block.getOffset() == firstDataBlockOffset) {
1084 seeker.rewind();
1085 return true;
1086 }
1087
1088 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1089 isCompaction, true, BlockType.DATA);
1090 if (block.getOffset() < 0) {
1091 throw new IOException("Invalid block offset: " + block.getOffset());
1092 }
1093 updateCurrentBlock(block);
1094 return true;
1095 }
1096
1097 @Override
1098 public boolean next() throws IOException {
1099 boolean isValid = seeker.next();
1100 if (!isValid) {
1101 block = readNextDataBlock();
1102 isValid = block != null;
1103 if (isValid) {
1104 updateCurrentBlock(block);
1105 }
1106 }
1107 return isValid;
1108 }
1109
1110 @Override
1111 public ByteBuffer getKey() {
1112 assertValidSeek();
1113 return seeker.getKeyDeepCopy();
1114 }
1115
1116 @Override
1117 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
1118 return seeker.compareKey(comparator, key, offset, length);
1119 }
1120
1121 @Override
1122 public ByteBuffer getValue() {
1123 assertValidSeek();
1124 return seeker.getValueShallowCopy();
1125 }
1126
1127 @Override
1128 public KeyValue getKeyValue() {
1129 if (block == null) {
1130 return null;
1131 }
1132 return seeker.getKeyValue();
1133 }
1134
1135 @Override
1136 public String getKeyString() {
1137 ByteBuffer keyBuffer = getKey();
1138 return Bytes.toStringBinary(keyBuffer.array(),
1139 keyBuffer.arrayOffset(), keyBuffer.limit());
1140 }
1141
1142 @Override
1143 public String getValueString() {
1144 ByteBuffer valueBuffer = getValue();
1145 return Bytes.toStringBinary(valueBuffer.array(),
1146 valueBuffer.arrayOffset(), valueBuffer.limit());
1147 }
1148
1149 private void assertValidSeek() {
1150 if (block == null) {
1151 throw new NotSeekedException();
1152 }
1153 }
1154
1155 @Override
1156 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1157 return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1158 }
1159
1160 @Override
1161 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey,
1162 boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
1163 throws IOException {
1164 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1165 updateCurrentBlock(seekToBlock);
1166 } else if (rewind) {
1167 seeker.rewind();
1168 }
1169 this.nextIndexedKey = nextIndexedKey;
1170 return seeker.seekToKeyInBlock(key, offset, length, seekBefore);
1171 }
1172 }
1173
1174
1175
1176
1177
1178 @Override
1179 public DataInput getGeneralBloomFilterMetadata() throws IOException {
1180 return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1181 }
1182
1183 @Override
1184 public DataInput getDeleteBloomFilterMetadata() throws IOException {
1185 return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1186 }
1187
1188 private DataInput getBloomFilterMetadata(BlockType blockType)
1189 throws IOException {
1190 if (blockType != BlockType.GENERAL_BLOOM_META &&
1191 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1192 throw new RuntimeException("Block Type: " + blockType.toString() +
1193 " is not supported") ;
1194 }
1195
1196 for (HFileBlock b : loadOnOpenBlocks)
1197 if (b.getBlockType() == blockType)
1198 return b.getByteStream();
1199 return null;
1200 }
1201
1202 @Override
1203 public boolean isFileInfoLoaded() {
1204 return true;
1205 }
1206
1207
1208
1209
1210
1211 private void validateMinorVersion(Path path, int minorVersion) {
1212 if (minorVersion < MIN_MINOR_VERSION ||
1213 minorVersion > MAX_MINOR_VERSION) {
1214 String msg = "Minor version for path " + path +
1215 " is expected to be between " +
1216 MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1217 " but is found to be " + minorVersion;
1218 LOG.error(msg);
1219 throw new RuntimeException(msg);
1220 }
1221 }
1222
1223 @Override
1224 public int getMajorVersion() {
1225 return 2;
1226 }
1227
1228 @Override
1229 public HFileContext getFileContext() {
1230 return hfileContext;
1231 }
1232
1233
1234
1235
1236
1237 @VisibleForTesting
1238 boolean prefetchComplete() {
1239 return PrefetchExecutor.isCompleted(path);
1240 }
1241 }