1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23 import static org.junit.Assert.*;
24
25 import java.io.ByteArrayOutputStream;
26 import java.io.DataOutputStream;
27 import java.io.IOException;
28 import java.io.OutputStream;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.List;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.fs.FSDataInputStream;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.fs.HFileSystem;
42 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
43 import org.apache.hadoop.hbase.io.compress.Compression;
44 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
45 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
46 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
47 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
48 import org.apache.hadoop.hbase.testclassification.SmallTests;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.ChecksumType;
51 import org.apache.hadoop.io.compress.Compressor;
52 import org.junit.Before;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55 import org.junit.runner.RunWith;
56 import org.junit.runners.Parameterized;
57 import org.junit.runners.Parameterized.Parameters;
58
59 import com.google.common.base.Preconditions;
60
61
62
63
64
65 @Category(SmallTests.class)
66 @RunWith(Parameterized.class)
67 public class TestHFileBlockCompatibility {
68
69 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
70
71 private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
72
73 private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
74 NONE, GZ };
75
76
77 private static int MINOR_VERSION = 0;
78
79 private static final HBaseTestingUtility TEST_UTIL =
80 new HBaseTestingUtility();
81 private HFileSystem fs;
82 private int uncompressedSizeV1;
83
84 private final boolean includesMemstoreTS;
85 private final boolean includesTag;
86
87 public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) {
88 this.includesMemstoreTS = includesMemstoreTS;
89 this.includesTag = includesTag;
90 }
91
92 @Parameters
93 public static Collection<Object[]> parameters() {
94 return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
95 }
96
97 @Before
98 public void setUp() throws IOException {
99 fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
100 }
101
102 public byte[] createTestV1Block(Compression.Algorithm algo)
103 throws IOException {
104 Compressor compressor = algo.getCompressor();
105 ByteArrayOutputStream baos = new ByteArrayOutputStream();
106 OutputStream os = algo.createCompressionStream(baos, compressor, 0);
107 DataOutputStream dos = new DataOutputStream(os);
108 BlockType.META.write(dos);
109 TestHFileBlock.writeTestBlockContents(dos);
110 uncompressedSizeV1 = dos.size();
111 dos.flush();
112 algo.returnCompressor(compressor);
113 return baos.toByteArray();
114 }
115
116 private Writer createTestV2Block(Compression.Algorithm algo)
117 throws IOException {
118 final BlockType blockType = BlockType.DATA;
119 Writer hbw = new Writer(algo, null,
120 includesMemstoreTS, includesTag);
121 DataOutputStream dos = hbw.startWriting(blockType);
122 TestHFileBlock.writeTestBlockContents(dos);
123
124 hbw.getHeaderAndData();
125 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
126 hbw.releaseCompressor();
127 return hbw;
128 }
129
130 private String createTestBlockStr(Compression.Algorithm algo,
131 int correctLength) throws IOException {
132 Writer hbw = createTestV2Block(algo);
133 byte[] testV2Block = hbw.getHeaderAndData();
134 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9;
135 if (testV2Block.length == correctLength) {
136
137
138
139 testV2Block[osOffset] = 3;
140 }
141 return Bytes.toStringBinary(testV2Block);
142 }
143
144 @Test
145 public void testNoCompression() throws IOException {
146 assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
147 getUncompressedSizeWithoutHeader());
148 }
149
150 @Test
151 public void testGzipCompression() throws IOException {
152 final String correctTestBlockStr =
153 "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
154 + "\\xFF\\xFF\\xFF\\xFF"
155
156 + "\\x1F\\x8B"
157 + "\\x08"
158 + "\\x00"
159 + "\\x00\\x00\\x00\\x00"
160 + "\\x00"
161
162
163 + "\\x03"
164 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
165 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
166 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00";
167 final int correctGzipBlockLength = 82;
168
169 String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
170 assertEquals(correctTestBlockStr, returnedStr);
171 }
172
173 @Test
174 public void testReaderV2() throws IOException {
175 if(includesTag) {
176 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
177 }
178 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
179 for (boolean pread : new boolean[] { false, true }) {
180 LOG.info("testReaderV2: Compression algorithm: " + algo +
181 ", pread=" + pread);
182 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
183 + algo);
184 FSDataOutputStream os = fs.create(path);
185 Writer hbw = new Writer(algo, null,
186 includesMemstoreTS, includesTag);
187 long totalSize = 0;
188 for (int blockId = 0; blockId < 2; ++blockId) {
189 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
190 for (int i = 0; i < 1234; ++i)
191 dos.writeInt(i);
192 hbw.writeHeaderAndData(os);
193 totalSize += hbw.getOnDiskSizeWithHeader();
194 }
195 os.close();
196
197 FSDataInputStream is = fs.open(path);
198 HFileContext meta = new HFileContextBuilder()
199 .withHBaseCheckSum(false)
200 .withIncludesMvcc(includesMemstoreTS)
201 .withIncludesTags(includesTag)
202 .withCompression(algo)
203 .build();
204 HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
205 totalSize, fs, path, meta);
206 HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
207 is.close();
208
209 b.sanityCheck();
210 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
211 assertEquals(algo == GZ ? 2173 : 4936,
212 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
213 HFileBlock expected = b;
214
215 if (algo == GZ) {
216 is = fs.open(path);
217 hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is), totalSize, fs, path,
218 meta);
219 b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
220 b.totalChecksumBytes(), -1, pread);
221 assertEquals(expected, b);
222 int wrongCompressedSize = 2172;
223 try {
224 b = hbr.readBlockData(0, wrongCompressedSize
225 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread);
226 fail("Exception expected");
227 } catch (IOException ex) {
228 String expectedPrefix = "On-disk size without header provided is "
229 + wrongCompressedSize + ", but block header contains "
230 + b.getOnDiskSizeWithoutHeader() + ".";
231 assertTrue("Invalid exception message: '" + ex.getMessage()
232 + "'.\nMessage is expected to start with: '" + expectedPrefix
233 + "'", ex.getMessage().startsWith(expectedPrefix));
234 }
235 is.close();
236 }
237 }
238 }
239 }
240
241
242
243
244
245 @Test
246 public void testDataBlockEncoding() throws IOException {
247 if(includesTag) {
248 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
249 }
250 final int numBlocks = 5;
251 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
252 for (boolean pread : new boolean[] { false, true }) {
253 for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
254 LOG.info("testDataBlockEncoding algo " + algo +
255 " pread = " + pread +
256 " encoding " + encoding);
257 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
258 + algo + "_" + encoding.toString());
259 FSDataOutputStream os = fs.create(path);
260 HFileDataBlockEncoder dataBlockEncoder =
261 new HFileDataBlockEncoderImpl(encoding);
262 TestHFileBlockCompatibility.Writer hbw =
263 new TestHFileBlockCompatibility.Writer(algo,
264 dataBlockEncoder, includesMemstoreTS, includesTag);
265 long totalSize = 0;
266 final List<Integer> encodedSizes = new ArrayList<Integer>();
267 final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
268 for (int blockId = 0; blockId < numBlocks; ++blockId) {
269 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
270 TestHFileBlock.writeEncodedBlock(algo, encoding, dos, encodedSizes,
271 encodedBlocks, blockId, includesMemstoreTS,
272 TestHFileBlockCompatibility.Writer.DUMMY_HEADER, includesTag);
273
274 hbw.writeHeaderAndData(os);
275 totalSize += hbw.getOnDiskSizeWithHeader();
276 }
277 os.close();
278
279 FSDataInputStream is = fs.open(path);
280 HFileContext meta = new HFileContextBuilder()
281 .withHBaseCheckSum(false)
282 .withIncludesMvcc(includesMemstoreTS)
283 .withIncludesTags(includesTag)
284 .withCompression(algo)
285 .build();
286 HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
287 totalSize, fs, path, meta);
288 hbr.setDataBlockEncoder(dataBlockEncoder);
289 hbr.setIncludesMemstoreTS(includesMemstoreTS);
290
291 HFileBlock b;
292 int pos = 0;
293 for (int blockId = 0; blockId < numBlocks; ++blockId) {
294 b = hbr.readBlockData(pos, -1, -1, pread);
295 b.sanityCheck();
296 if (meta.isCompressedOrEncrypted()) {
297 assertFalse(b.isUnpacked());
298 b = b.unpack(meta, hbr);
299 }
300 pos += b.getOnDiskSizeWithHeader();
301
302 assertEquals((int) encodedSizes.get(blockId),
303 b.getUncompressedSizeWithoutHeader());
304 ByteBuffer actualBuffer = b.getBufferWithoutHeader();
305 if (encoding != DataBlockEncoding.NONE) {
306
307 assertEquals(0, actualBuffer.get(0));
308 assertEquals(encoding.getId(), actualBuffer.get(1));
309 actualBuffer.position(2);
310 actualBuffer = actualBuffer.slice();
311 }
312
313 ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
314 expectedBuffer.rewind();
315
316
317 TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer,
318 algo, encoding, pread);
319 }
320 is.close();
321 }
322 }
323 }
324 }
325
326
327
328
329
330
331
332
333
334 public static final class Writer {
335
336
337 private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
338 private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
339 private static final byte[] DUMMY_HEADER =
340 HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
341
342 private enum State {
343 INIT,
344 WRITING,
345 BLOCK_READY
346 };
347
348
349 private State state = State.INIT;
350
351
352 private final Compression.Algorithm compressAlgo;
353
354
355 private final HFileDataBlockEncoder dataBlockEncoder;
356
357 private HFileBlockEncodingContext dataBlockEncodingCtx;
358
359 private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
360
361
362
363
364
365
366
367 private ByteArrayOutputStream baosInMemory;
368
369
370 private Compressor compressor;
371
372
373
374
375
376
377 private BlockType blockType;
378
379
380
381
382
383 private DataOutputStream userDataStream;
384
385
386
387
388
389 private byte[] onDiskBytesWithHeader;
390
391
392
393
394
395
396 private byte[] uncompressedBytesWithHeader;
397
398
399
400
401
402 private long startOffset;
403
404
405
406
407
408 private long[] prevOffsetByType;
409
410
411 private long prevOffset;
412
413 private HFileContext meta;
414
415 public Writer(Compression.Algorithm compressionAlgorithm,
416 HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
417 compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
418 this.dataBlockEncoder = dataBlockEncoder != null
419 ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
420
421 meta = new HFileContextBuilder()
422 .withHBaseCheckSum(false)
423 .withIncludesMvcc(includesMemstoreTS)
424 .withIncludesTags(includesTag)
425 .withCompression(compressionAlgorithm)
426 .build();
427 defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
428 dataBlockEncodingCtx =
429 this.dataBlockEncoder.newDataBlockEncodingContext(
430 DUMMY_HEADER, meta);
431 baosInMemory = new ByteArrayOutputStream();
432
433 prevOffsetByType = new long[BlockType.values().length];
434 for (int i = 0; i < prevOffsetByType.length; ++i)
435 prevOffsetByType[i] = -1;
436
437 }
438
439
440
441
442
443
444
445 public DataOutputStream startWriting(BlockType newBlockType)
446 throws IOException {
447 if (state == State.BLOCK_READY && startOffset != -1) {
448
449
450 prevOffsetByType[blockType.getId()] = startOffset;
451 }
452
453 startOffset = -1;
454 blockType = newBlockType;
455
456 baosInMemory.reset();
457 baosInMemory.write(DUMMY_HEADER);
458
459 state = State.WRITING;
460
461
462 userDataStream = new DataOutputStream(baosInMemory);
463 return userDataStream;
464 }
465
466
467
468
469
470
471
472
473 DataOutputStream getUserDataStream() {
474 expectState(State.WRITING);
475 return userDataStream;
476 }
477
478
479
480
481
482 private void ensureBlockReady() throws IOException {
483 Preconditions.checkState(state != State.INIT,
484 "Unexpected state: " + state);
485
486 if (state == State.BLOCK_READY)
487 return;
488
489
490 finishBlock();
491 }
492
493
494
495
496
497
498
499 private void finishBlock() throws IOException {
500 userDataStream.flush();
501
502 uncompressedBytesWithHeader = baosInMemory.toByteArray();
503 prevOffset = prevOffsetByType[blockType.getId()];
504
505
506
507
508 state = State.BLOCK_READY;
509 if (blockType == BlockType.DATA) {
510 encodeDataBlockForDisk();
511 } else {
512 defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
513 uncompressedBytesWithHeader, blockType);
514 onDiskBytesWithHeader =
515 defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
516 }
517
518
519 putHeader(onDiskBytesWithHeader, 0,
520 onDiskBytesWithHeader.length,
521 uncompressedBytesWithHeader.length);
522
523 putHeader(uncompressedBytesWithHeader, 0,
524 onDiskBytesWithHeader.length,
525 uncompressedBytesWithHeader.length);
526 }
527
528
529
530
531
532 private void encodeDataBlockForDisk() throws IOException {
533
534 ByteBuffer rawKeyValues =
535 ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
536 uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
537
538
539 dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
540
541 uncompressedBytesWithHeader =
542 dataBlockEncodingCtx.getUncompressedBytesWithHeader();
543 onDiskBytesWithHeader =
544 dataBlockEncodingCtx.getOnDiskBytesWithHeader();
545 blockType = dataBlockEncodingCtx.getBlockType();
546 }
547
548
549
550
551
552
553
554 private void putHeader(byte[] dest, int offset, int onDiskSize,
555 int uncompressedSize) {
556 offset = blockType.put(dest, offset);
557 offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
558 offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
559 Bytes.putLong(dest, offset, prevOffset);
560 }
561
562
563
564
565
566
567
568
569
570 public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
571 long offset = out.getPos();
572 if (startOffset != -1 && offset != startOffset) {
573 throw new IOException("A " + blockType + " block written to a "
574 + "stream twice, first at offset " + startOffset + ", then at "
575 + offset);
576 }
577 startOffset = offset;
578
579 writeHeaderAndData((DataOutputStream) out);
580 }
581
582
583
584
585
586
587
588
589
590
591 private void writeHeaderAndData(DataOutputStream out) throws IOException {
592 ensureBlockReady();
593 out.write(onDiskBytesWithHeader);
594 }
595
596
597
598
599
600
601
602
603
604
605 public byte[] getHeaderAndData() throws IOException {
606 ensureBlockReady();
607 return onDiskBytesWithHeader;
608 }
609
610
611
612
613
614 public void releaseCompressor() {
615 if (compressor != null) {
616 compressAlgo.returnCompressor(compressor);
617 compressor = null;
618 }
619 }
620
621
622
623
624
625
626
627
628
629 public int getOnDiskSizeWithoutHeader() {
630 expectState(State.BLOCK_READY);
631 return onDiskBytesWithHeader.length - HEADER_SIZE;
632 }
633
634
635
636
637
638
639
640
641 public int getOnDiskSizeWithHeader() {
642 expectState(State.BLOCK_READY);
643 return onDiskBytesWithHeader.length;
644 }
645
646
647
648
649 public int getUncompressedSizeWithoutHeader() {
650 expectState(State.BLOCK_READY);
651 return uncompressedBytesWithHeader.length - HEADER_SIZE;
652 }
653
654
655
656
657 public int getUncompressedSizeWithHeader() {
658 expectState(State.BLOCK_READY);
659 return uncompressedBytesWithHeader.length;
660 }
661
662
663 public boolean isWriting() {
664 return state == State.WRITING;
665 }
666
667
668
669
670
671
672
673
674 public int blockSizeWritten() {
675 if (state != State.WRITING)
676 return 0;
677 return userDataStream.size();
678 }
679
680
681
682
683
684
685
686
687 private byte[] getUncompressedDataWithHeader() {
688 expectState(State.BLOCK_READY);
689
690 return uncompressedBytesWithHeader;
691 }
692
693 private void expectState(State expectedState) {
694 if (state != expectedState) {
695 throw new IllegalStateException("Expected state: " + expectedState +
696 ", actual state: " + state);
697 }
698 }
699
700
701
702
703
704
705
706 public ByteBuffer getUncompressedBufferWithHeader() {
707 byte[] b = getUncompressedDataWithHeader();
708 return ByteBuffer.wrap(b, 0, b.length);
709 }
710
711
712
713
714
715
716
717
718
719
720
721 public void writeBlock(BlockWritable bw, FSDataOutputStream out)
722 throws IOException {
723 bw.writeToBlock(startWriting(bw.getBlockType()));
724 writeHeaderAndData(out);
725 }
726
727
728
729
730 public HFileBlock getBlockForCaching() {
731 HFileContext meta = new HFileContextBuilder()
732 .withHBaseCheckSum(false)
733 .withChecksumType(ChecksumType.NULL)
734 .withBytesPerCheckSum(0)
735 .build();
736 return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
737 getUncompressedSizeWithoutHeader(), prevOffset,
738 getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
739 getOnDiskSizeWithoutHeader(), meta);
740 }
741 }
742
743 }
744