1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23 import static org.junit.Assert.*;
24
25 import java.io.ByteArrayOutputStream;
26 import java.io.DataOutputStream;
27 import java.io.IOException;
28 import java.io.OutputStream;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Random;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.ExecutorCompletionService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.fs.FSDataInputStream;
47 import org.apache.hadoop.fs.FSDataOutputStream;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.HBaseTestingUtility;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.KeyValue;
53 import org.apache.hadoop.hbase.testclassification.MediumTests;
54 import org.apache.hadoop.hbase.Tag;
55 import org.apache.hadoop.hbase.fs.HFileSystem;
56 import org.apache.hadoop.hbase.io.DoubleOutputStream;
57 import org.apache.hadoop.hbase.io.compress.Compression;
58 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
59 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
60 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
61 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
62 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
63 import org.apache.hadoop.hbase.util.Bytes;
64 import org.apache.hadoop.hbase.util.ChecksumType;
65 import org.apache.hadoop.hbase.util.ClassSize;
66 import org.apache.hadoop.io.WritableUtils;
67 import org.apache.hadoop.io.compress.Compressor;
68 import org.junit.Before;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.junit.runner.RunWith;
72 import org.junit.runners.Parameterized;
73 import org.junit.runners.Parameterized.Parameters;
74 import org.mockito.Mockito;
75
76 @Category(MediumTests.class)
77 @RunWith(Parameterized.class)
78 public class TestHFileBlock {
79
80 private static final boolean detailedLogging = false;
81 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
82
83 private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
84
85 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
86 NONE, GZ };
87
88 private static final int NUM_TEST_BLOCKS = 1000;
89 private static final int NUM_READER_THREADS = 26;
90
91
92 private static int NUM_KEYVALUES = 50;
93 private static int FIELD_LENGTH = 10;
94 private static float CHANCE_TO_REPEAT = 0.6f;
95
96 private static final HBaseTestingUtility TEST_UTIL =
97 new HBaseTestingUtility();
98 private FileSystem fs;
99 private int uncompressedSizeV1;
100
101 private final boolean includesMemstoreTS;
102 private final boolean includesTag;
103 public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
104 this.includesMemstoreTS = includesMemstoreTS;
105 this.includesTag = includesTag;
106 }
107
108 @Parameters
109 public static Collection<Object[]> parameters() {
110 return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
111 }
112
113 @Before
114 public void setUp() throws IOException {
115 fs = HFileSystem.get(TEST_UTIL.getConfiguration());
116 }
117
118 static void writeTestBlockContents(DataOutputStream dos) throws IOException {
119
120 for (int i = 0; i < 1000; ++i)
121 dos.writeInt(i / 100);
122 }
123
124 static int writeTestKeyValues(OutputStream dos, int seed, boolean includesMemstoreTS, boolean useTag)
125 throws IOException {
126 List<KeyValue> keyValues = new ArrayList<KeyValue>();
127 Random randomizer = new Random(42l + seed);
128
129
130 for (int i = 0; i < NUM_KEYVALUES; ++i) {
131 byte[] row;
132 long timestamp;
133 byte[] family;
134 byte[] qualifier;
135 byte[] value;
136
137
138 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
139 row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
140 } else {
141 row = new byte[FIELD_LENGTH];
142 randomizer.nextBytes(row);
143 }
144 if (0 == i) {
145 family = new byte[FIELD_LENGTH];
146 randomizer.nextBytes(family);
147 } else {
148 family = keyValues.get(0).getFamily();
149 }
150 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
151 qualifier = keyValues.get(
152 randomizer.nextInt(keyValues.size())).getQualifier();
153 } else {
154 qualifier = new byte[FIELD_LENGTH];
155 randomizer.nextBytes(qualifier);
156 }
157 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
158 value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
159 } else {
160 value = new byte[FIELD_LENGTH];
161 randomizer.nextBytes(value);
162 }
163 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
164 timestamp = keyValues.get(
165 randomizer.nextInt(keyValues.size())).getTimestamp();
166 } else {
167 timestamp = randomizer.nextLong();
168 }
169 if (!useTag) {
170 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
171 } else {
172 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, new Tag[] { new Tag(
173 (byte) 1, Bytes.toBytes("myTagVal")) }));
174 }
175 }
176
177
178 int totalSize = 0;
179 Collections.sort(keyValues, KeyValue.COMPARATOR);
180 DataOutputStream dataOutputStream = new DataOutputStream(dos);
181
182 for (KeyValue kv : keyValues) {
183 dataOutputStream.writeInt(kv.getKeyLength());
184 dataOutputStream.writeInt(kv.getValueLength());
185 dataOutputStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
186 dataOutputStream.write(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
187
188
189 totalSize += kv.getLength();
190 if (useTag) {
191 dataOutputStream.writeShort(kv.getTagsLengthUnsigned());
192 dataOutputStream.write(kv.getBuffer(), kv.getTagsOffset(), kv.getTagsLengthUnsigned());
193 }
194 if (includesMemstoreTS) {
195 long memstoreTS = randomizer.nextLong();
196 WritableUtils.writeVLong(dataOutputStream, memstoreTS);
197 totalSize += WritableUtils.getVIntSize(memstoreTS);
198 }
199 }
200 return totalSize;
201 }
202
203 public byte[] createTestV1Block(Compression.Algorithm algo)
204 throws IOException {
205 Compressor compressor = algo.getCompressor();
206 ByteArrayOutputStream baos = new ByteArrayOutputStream();
207 OutputStream os = algo.createCompressionStream(baos, compressor, 0);
208 DataOutputStream dos = new DataOutputStream(os);
209 BlockType.META.write(dos);
210 writeTestBlockContents(dos);
211 uncompressedSizeV1 = dos.size();
212 dos.flush();
213 algo.returnCompressor(compressor);
214 return baos.toByteArray();
215 }
216
217 static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
218 boolean includesMemstoreTS, boolean includesTag) throws IOException {
219 final BlockType blockType = BlockType.DATA;
220 HFileContext meta = new HFileContextBuilder()
221 .withCompression(algo)
222 .withIncludesMvcc(includesMemstoreTS)
223 .withIncludesTags(includesTag)
224 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
225 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
226 .build();
227 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
228 DataOutputStream dos = hbw.startWriting(blockType);
229 writeTestBlockContents(dos);
230 dos.flush();
231 byte[] headerAndData = hbw.getHeaderAndDataForTest();
232 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
233 hbw.release();
234 return hbw;
235 }
236
237 public String createTestBlockStr(Compression.Algorithm algo,
238 int correctLength, boolean useTag) throws IOException {
239 HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
240 byte[] testV2Block = hbw.getHeaderAndDataForTest();
241 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
242 if (testV2Block.length == correctLength) {
243
244
245
246
247
248 testV2Block[osOffset] = 3;
249 }
250 return Bytes.toStringBinary(testV2Block);
251 }
252
253 @Test
254 public void testNoCompression() throws IOException {
255 CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
256 Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false);
257
258 HFileBlock block =
259 createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
260 assertEquals(4000, block.getUncompressedSizeWithoutHeader());
261 assertEquals(4004, block.getOnDiskSizeWithoutHeader());
262 assertTrue(block.isUnpacked());
263 }
264
265 @Test
266 public void testGzipCompression() throws IOException {
267 final String correctTestBlockStr =
268 "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
269 + "\\xFF\\xFF\\xFF\\xFF"
270 + "\\x01\\x00\\x00@\\x00\\x00\\x00\\x00["
271
272 + "\\x1F\\x8B"
273 + "\\x08"
274 + "\\x00"
275 + "\\x00\\x00\\x00\\x00"
276 + "\\x00"
277
278
279
280
281 + "\\x03"
282 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
283 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
284 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
285 + "\\x00\\x00\\x00\\x00";
286 final int correctGzipBlockLength = 95;
287 final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
288
289
290 assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
291 testBlockStr.substring(0, correctGzipBlockLength - 4));
292 }
293
294 @Test
295 public void testReaderV2() throws IOException {
296 testReaderV2Internals();
297 }
298
299 protected void testReaderV2Internals() throws IOException {
300 if(includesTag) {
301 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
302 }
303 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
304 for (boolean pread : new boolean[] { false, true }) {
305 LOG.info("testReaderV2: Compression algorithm: " + algo +
306 ", pread=" + pread);
307 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
308 + algo);
309 FSDataOutputStream os = fs.create(path);
310 HFileContext meta = new HFileContextBuilder()
311 .withCompression(algo)
312 .withIncludesMvcc(includesMemstoreTS)
313 .withIncludesTags(includesTag)
314 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
315 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
316 .build();
317 HFileBlock.Writer hbw = new HFileBlock.Writer(null,
318 meta);
319 long totalSize = 0;
320 for (int blockId = 0; blockId < 2; ++blockId) {
321 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
322 for (int i = 0; i < 1234; ++i)
323 dos.writeInt(i);
324 hbw.writeHeaderAndData(os);
325 totalSize += hbw.getOnDiskSizeWithHeader();
326 }
327 os.close();
328
329 FSDataInputStream is = fs.open(path);
330 meta = new HFileContextBuilder()
331 .withHBaseCheckSum(true)
332 .withIncludesMvcc(includesMemstoreTS)
333 .withIncludesTags(includesTag)
334 .withCompression(algo).build();
335 HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
336 HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
337 is.close();
338 assertEquals(0, HFile.getChecksumFailuresCount());
339
340 b.sanityCheck();
341 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
342 assertEquals(algo == GZ ? 2173 : 4936,
343 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
344 HFileBlock expected = b;
345
346 if (algo == GZ) {
347 is = fs.open(path);
348 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
349 b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
350 b.totalChecksumBytes(), -1, pread);
351 assertEquals(expected, b);
352 int wrongCompressedSize = 2172;
353 try {
354 b = hbr.readBlockData(0, wrongCompressedSize
355 + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread);
356 fail("Exception expected");
357 } catch (IOException ex) {
358 String expectedPrefix = "On-disk size without header provided is "
359 + wrongCompressedSize + ", but block header contains "
360 + b.getOnDiskSizeWithoutHeader() + ".";
361 assertTrue("Invalid exception message: '" + ex.getMessage()
362 + "'.\nMessage is expected to start with: '" + expectedPrefix
363 + "'", ex.getMessage().startsWith(expectedPrefix));
364 }
365 is.close();
366 }
367 }
368 }
369 }
370
371
372
373
374
375 @Test
376 public void testDataBlockEncoding() throws IOException {
377 testInternals();
378 }
379
380 private void testInternals() throws IOException {
381 final int numBlocks = 5;
382 if(includesTag) {
383 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
384 }
385 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
386 for (boolean pread : new boolean[] { false, true }) {
387 for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
388 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
389 + algo + "_" + encoding.toString());
390 FSDataOutputStream os = fs.create(path);
391 HFileDataBlockEncoder dataBlockEncoder =
392 new HFileDataBlockEncoderImpl(encoding);
393 HFileContext meta = new HFileContextBuilder()
394 .withCompression(algo)
395 .withIncludesMvcc(includesMemstoreTS)
396 .withIncludesTags(includesTag)
397 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
398 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
399 .build();
400 HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder,
401 meta);
402 long totalSize = 0;
403 final List<Integer> encodedSizes = new ArrayList<Integer>();
404 final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
405 for (int blockId = 0; blockId < numBlocks; ++blockId) {
406 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
407 writeEncodedBlock(algo, encoding, dos, encodedSizes, encodedBlocks,
408 blockId, includesMemstoreTS, HConstants.HFILEBLOCK_DUMMY_HEADER, includesTag);
409 hbw.writeHeaderAndData(os);
410 totalSize += hbw.getOnDiskSizeWithHeader();
411 }
412 os.close();
413
414 FSDataInputStream is = fs.open(path);
415 meta = new HFileContextBuilder()
416 .withHBaseCheckSum(true)
417 .withCompression(algo)
418 .withIncludesMvcc(includesMemstoreTS)
419 .withIncludesTags(includesTag)
420 .build();
421 HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
422 hbr.setDataBlockEncoder(dataBlockEncoder);
423 hbr.setIncludesMemstoreTS(includesMemstoreTS);
424 HFileBlock blockFromHFile, blockUnpacked;
425 int pos = 0;
426 for (int blockId = 0; blockId < numBlocks; ++blockId) {
427 blockFromHFile = hbr.readBlockData(pos, -1, -1, pread);
428 assertEquals(0, HFile.getChecksumFailuresCount());
429 blockFromHFile.sanityCheck();
430 pos += blockFromHFile.getOnDiskSizeWithHeader();
431 assertEquals((int) encodedSizes.get(blockId),
432 blockFromHFile.getUncompressedSizeWithoutHeader());
433 assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
434 long packedHeapsize = blockFromHFile.heapSize();
435 blockUnpacked = blockFromHFile.unpack(meta, hbr);
436 assertTrue(blockUnpacked.isUnpacked());
437 if (meta.isCompressedOrEncrypted()) {
438 LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked
439 .heapSize());
440 assertFalse(packedHeapsize == blockUnpacked.heapSize());
441 assertTrue("Packed heapSize should be < unpacked heapSize",
442 packedHeapsize < blockUnpacked.heapSize());
443 }
444 ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader();
445 if (encoding != DataBlockEncoding.NONE) {
446
447 assertEquals(
448 "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
449 Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
450 assertEquals(
451 "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
452 Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
453 actualBuffer.position(2);
454 actualBuffer = actualBuffer.slice();
455 }
456
457 ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
458 expectedBuffer.rewind();
459
460
461 assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread);
462
463
464 for (boolean reuseBuffer : new boolean[] { false, true }) {
465 ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
466 blockFromHFile.serialize(serialized);
467 HFileBlock deserialized =
468 (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer);
469 assertEquals(
470 "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
471 blockFromHFile, deserialized);
472
473 if (blockFromHFile != blockUnpacked) {
474 assertEquals("Deserializaed block cannot be unpacked correctly.",
475 blockUnpacked, deserialized.unpack(meta, hbr));
476 }
477 }
478 }
479 is.close();
480 }
481 }
482 }
483 }
484
485 static void writeEncodedBlock(Algorithm algo, DataBlockEncoding encoding,
486 DataOutputStream dos, final List<Integer> encodedSizes,
487 final List<ByteBuffer> encodedBlocks, int blockId,
488 boolean includesMemstoreTS, byte[] dummyHeader, boolean useTag) throws IOException {
489 ByteArrayOutputStream baos = new ByteArrayOutputStream();
490 DoubleOutputStream doubleOutputStream =
491 new DoubleOutputStream(dos, baos);
492 writeTestKeyValues(doubleOutputStream, blockId, includesMemstoreTS, useTag);
493 ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
494 rawBuf.rewind();
495
496 DataBlockEncoder encoder = encoding.getEncoder();
497 int headerLen = dummyHeader.length;
498 byte[] encodedResultWithHeader = null;
499 HFileContext meta = new HFileContextBuilder()
500 .withCompression(algo)
501 .withIncludesMvcc(includesMemstoreTS)
502 .withIncludesTags(useTag)
503 .build();
504 if (encoder != null) {
505 HFileBlockEncodingContext encodingCtx = encoder.newDataBlockEncodingContext(encoding,
506 dummyHeader, meta);
507 encoder.encodeKeyValues(rawBuf, encodingCtx);
508 encodedResultWithHeader =
509 encodingCtx.getUncompressedBytesWithHeader();
510 } else {
511 HFileBlockDefaultEncodingContext defaultEncodingCtx = new HFileBlockDefaultEncodingContext(
512 encoding, dummyHeader, meta);
513 byte[] rawBufWithHeader =
514 new byte[rawBuf.array().length + headerLen];
515 System.arraycopy(rawBuf.array(), 0, rawBufWithHeader,
516 headerLen, rawBuf.array().length);
517 defaultEncodingCtx.compressAfterEncodingWithBlockType(rawBufWithHeader,
518 BlockType.DATA);
519 encodedResultWithHeader =
520 defaultEncodingCtx.getUncompressedBytesWithHeader();
521 }
522 final int encodedSize =
523 encodedResultWithHeader.length - headerLen;
524 if (encoder != null) {
525
526
527 headerLen += DataBlockEncoding.ID_SIZE;
528 }
529 byte[] encodedDataSection =
530 new byte[encodedResultWithHeader.length - headerLen];
531 System.arraycopy(encodedResultWithHeader, headerLen,
532 encodedDataSection, 0, encodedDataSection.length);
533 final ByteBuffer encodedBuf =
534 ByteBuffer.wrap(encodedDataSection);
535 encodedSizes.add(encodedSize);
536 encodedBlocks.add(encodedBuf);
537 }
538
539 static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
540 boolean pread) {
541 return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
542 }
543
544 static void assertBuffersEqual(ByteBuffer expectedBuffer,
545 ByteBuffer actualBuffer, Compression.Algorithm compression,
546 DataBlockEncoding encoding, boolean pread) {
547 if (!actualBuffer.equals(expectedBuffer)) {
548 int prefix = 0;
549 int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
550 while (prefix < minLimit &&
551 expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
552 prefix++;
553 }
554
555 fail(String.format(
556 "Content mismatch for %s, commonPrefix %d, expected %s, got %s",
557 buildMessageDetails(compression, encoding, pread), prefix,
558 nextBytesToStr(expectedBuffer, prefix),
559 nextBytesToStr(actualBuffer, prefix)));
560 }
561 }
562
563
564
565
566
567 private static String nextBytesToStr(ByteBuffer buf, int pos) {
568 int maxBytes = buf.limit() - pos;
569 int numBytes = Math.min(16, maxBytes);
570 return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
571 numBytes) + (numBytes < maxBytes ? "..." : "");
572 }
573
574 @Test
575 public void testPreviousOffset() throws IOException {
576 testPreviousOffsetInternals();
577 }
578
579 protected void testPreviousOffsetInternals() throws IOException {
580
581 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
582 for (boolean pread : BOOLEAN_VALUES) {
583 for (boolean cacheOnWrite : BOOLEAN_VALUES) {
584 Random rand = defaultRandom();
585 LOG.info("testPreviousOffset:Compression algorithm: " + algo +
586 ", pread=" + pread +
587 ", cacheOnWrite=" + cacheOnWrite);
588 Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
589 List<Long> expectedOffsets = new ArrayList<Long>();
590 List<Long> expectedPrevOffsets = new ArrayList<Long>();
591 List<BlockType> expectedTypes = new ArrayList<BlockType>();
592 List<ByteBuffer> expectedContents = cacheOnWrite
593 ? new ArrayList<ByteBuffer>() : null;
594 long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
595 expectedPrevOffsets, expectedTypes, expectedContents);
596
597 FSDataInputStream is = fs.open(path);
598 HFileContext meta = new HFileContextBuilder()
599 .withHBaseCheckSum(true)
600 .withIncludesMvcc(includesMemstoreTS)
601 .withIncludesTags(includesTag)
602 .withCompression(algo).build();
603 HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
604 long curOffset = 0;
605 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
606 if (!pread) {
607 assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
608 HConstants.HFILEBLOCK_HEADER_SIZE));
609 }
610
611 assertEquals(expectedOffsets.get(i).longValue(), curOffset);
612 if (detailedLogging) {
613 LOG.info("Reading block #" + i + " at offset " + curOffset);
614 }
615 HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
616 if (detailedLogging) {
617 LOG.info("Block #" + i + ": " + b);
618 }
619 assertEquals("Invalid block #" + i + "'s type:",
620 expectedTypes.get(i), b.getBlockType());
621 assertEquals("Invalid previous block offset for block " + i
622 + " of " + "type " + b.getBlockType() + ":",
623 (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
624 b.sanityCheck();
625 assertEquals(curOffset, b.getOffset());
626
627
628
629 HFileBlock b2 = hbr.readBlockData(curOffset,
630 b.getOnDiskSizeWithHeader(), -1, pread);
631 b2.sanityCheck();
632
633 assertEquals(b.getBlockType(), b2.getBlockType());
634 assertEquals(b.getOnDiskSizeWithoutHeader(),
635 b2.getOnDiskSizeWithoutHeader());
636 assertEquals(b.getOnDiskSizeWithHeader(),
637 b2.getOnDiskSizeWithHeader());
638 assertEquals(b.getUncompressedSizeWithoutHeader(),
639 b2.getUncompressedSizeWithoutHeader());
640 assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
641 assertEquals(curOffset, b2.getOffset());
642 assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
643 assertEquals(b.getOnDiskDataSizeWithHeader(),
644 b2.getOnDiskDataSizeWithHeader());
645 assertEquals(0, HFile.getChecksumFailuresCount());
646
647 curOffset += b.getOnDiskSizeWithHeader();
648
649 if (cacheOnWrite) {
650
651
652
653 b = b.unpack(meta, hbr);
654
655
656 ByteBuffer bufRead = b.getBufferWithHeader();
657 ByteBuffer bufExpected = expectedContents.get(i);
658 boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
659 bufRead.arrayOffset(),
660 bufRead.limit() - b.totalChecksumBytes(),
661 bufExpected.array(), bufExpected.arrayOffset(),
662 bufExpected.limit()) == 0;
663 String wrongBytesMsg = "";
664
665 if (!bytesAreCorrect) {
666
667
668 wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
669 + algo + ", pread=" + pread
670 + ", cacheOnWrite=" + cacheOnWrite + "):\n";
671 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
672 bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
673 + ", actual:\n"
674 + Bytes.toStringBinary(bufRead.array(),
675 bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
676 if (detailedLogging) {
677 LOG.warn("expected header" +
678 HFileBlock.toStringHeader(bufExpected) +
679 "\nfound header" +
680 HFileBlock.toStringHeader(bufRead));
681 LOG.warn("bufread offset " + bufRead.arrayOffset() +
682 " limit " + bufRead.limit() +
683 " expected offset " + bufExpected.arrayOffset() +
684 " limit " + bufExpected.limit());
685 LOG.warn(wrongBytesMsg);
686 }
687 }
688 assertTrue(wrongBytesMsg, bytesAreCorrect);
689 }
690 }
691
692 assertEquals(curOffset, fs.getFileStatus(path).getLen());
693 is.close();
694 }
695 }
696 }
697 }
698
699 private Random defaultRandom() {
700 return new Random(189237);
701 }
702
703 private class BlockReaderThread implements Callable<Boolean> {
704 private final String clientId;
705 private final HFileBlock.FSReader hbr;
706 private final List<Long> offsets;
707 private final List<BlockType> types;
708 private final long fileSize;
709
710 public BlockReaderThread(String clientId,
711 HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
712 long fileSize) {
713 this.clientId = clientId;
714 this.offsets = offsets;
715 this.hbr = hbr;
716 this.types = types;
717 this.fileSize = fileSize;
718 }
719
720 @Override
721 public Boolean call() throws Exception {
722 Random rand = new Random(clientId.hashCode());
723 long endTime = System.currentTimeMillis() + 10000;
724 int numBlocksRead = 0;
725 int numPositionalRead = 0;
726 int numWithOnDiskSize = 0;
727 while (System.currentTimeMillis() < endTime) {
728 int blockId = rand.nextInt(NUM_TEST_BLOCKS);
729 long offset = offsets.get(blockId);
730 boolean pread = rand.nextBoolean();
731 boolean withOnDiskSize = rand.nextBoolean();
732 long expectedSize =
733 (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
734 : offsets.get(blockId + 1)) - offset;
735
736 HFileBlock b;
737 try {
738 long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
739 b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
740 } catch (IOException ex) {
741 LOG.error("Error in client " + clientId + " trying to read block at "
742 + offset + ", pread=" + pread + ", withOnDiskSize=" +
743 withOnDiskSize, ex);
744 return false;
745 }
746
747 assertEquals(types.get(blockId), b.getBlockType());
748 assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
749 assertEquals(offset, b.getOffset());
750
751 ++numBlocksRead;
752 if (pread)
753 ++numPositionalRead;
754 if (withOnDiskSize)
755 ++numWithOnDiskSize;
756 }
757 LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
758 " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
759 "specified: " + numWithOnDiskSize + ")");
760
761 return true;
762 }
763
764 }
765
766 @Test
767 public void testConcurrentReading() throws Exception {
768 testConcurrentReadingInternals();
769 }
770
771 protected void testConcurrentReadingInternals() throws IOException,
772 InterruptedException, ExecutionException {
773 for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
774 Path path =
775 new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
776 Random rand = defaultRandom();
777 List<Long> offsets = new ArrayList<Long>();
778 List<BlockType> types = new ArrayList<BlockType>();
779 writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
780 FSDataInputStream is = fs.open(path);
781 long fileSize = fs.getFileStatus(path).getLen();
782 HFileContext meta = new HFileContextBuilder()
783 .withHBaseCheckSum(true)
784 .withIncludesMvcc(includesMemstoreTS)
785 .withIncludesTags(includesTag)
786 .withCompression(compressAlgo)
787 .build();
788 HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, fileSize, meta);
789
790 Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
791 ExecutorCompletionService<Boolean> ecs =
792 new ExecutorCompletionService<Boolean>(exec);
793
794 for (int i = 0; i < NUM_READER_THREADS; ++i) {
795 ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
796 offsets, types, fileSize));
797 }
798
799 for (int i = 0; i < NUM_READER_THREADS; ++i) {
800 Future<Boolean> result = ecs.take();
801 assertTrue(result.get());
802 if (detailedLogging) {
803 LOG.info(String.valueOf(i + 1)
804 + " reader threads finished successfully (algo=" + compressAlgo
805 + ")");
806 }
807 }
808
809 is.close();
810 }
811 }
812
813 private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
814 Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
815 List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
816 ) throws IOException {
817 boolean cacheOnWrite = expectedContents != null;
818 FSDataOutputStream os = fs.create(path);
819 HFileContext meta = new HFileContextBuilder()
820 .withHBaseCheckSum(true)
821 .withIncludesMvcc(includesMemstoreTS)
822 .withIncludesTags(includesTag)
823 .withCompression(compressAlgo)
824 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
825 .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
826 .build();
827 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
828 Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
829 long totalSize = 0;
830 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
831 long pos = os.getPos();
832 int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
833 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
834 blockTypeOrdinal = BlockType.DATA.ordinal();
835 }
836 BlockType bt = BlockType.values()[blockTypeOrdinal];
837 DataOutputStream dos = hbw.startWriting(bt);
838 int size = rand.nextInt(500);
839 for (int j = 0; j < size; ++j) {
840
841 dos.writeShort(i + 1);
842 dos.writeInt(j + 1);
843 }
844
845 if (expectedOffsets != null)
846 expectedOffsets.add(os.getPos());
847
848 if (expectedPrevOffsets != null) {
849 Long prevOffset = prevOffsetByType.get(bt);
850 expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
851 prevOffsetByType.put(bt, os.getPos());
852 }
853
854 expectedTypes.add(bt);
855
856 hbw.writeHeaderAndData(os);
857 totalSize += hbw.getOnDiskSizeWithHeader();
858
859 if (cacheOnWrite)
860 expectedContents.add(hbw.getUncompressedBufferWithHeader());
861
862 if (detailedLogging) {
863 LOG.info("Written block #" + i + " of type " + bt
864 + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
865 + ", packed size " + hbw.getOnDiskSizeWithoutHeader()
866 + " at offset " + pos);
867 }
868 }
869 os.close();
870 LOG.info("Created a temporary file at " + path + ", "
871 + fs.getFileStatus(path).getLen() + " byte, compression=" +
872 compressAlgo);
873 return totalSize;
874 }
875
876 @Test
877 public void testBlockHeapSize() {
878 testBlockHeapSizeInternals();
879 }
880
881 protected void testBlockHeapSizeInternals() {
882 if (ClassSize.is32BitJVM()) {
883 assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 64);
884 } else {
885 assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 80);
886 }
887
888 for (int size : new int[] { 100, 256, 12345 }) {
889 byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
890 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
891 HFileContext meta = new HFileContextBuilder()
892 .withIncludesMvcc(includesMemstoreTS)
893 .withIncludesTags(includesTag)
894 .withHBaseCheckSum(false)
895 .withCompression(Algorithm.NONE)
896 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
897 .withChecksumType(ChecksumType.NULL).build();
898 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
899 HFileBlock.FILL_HEADER, -1,
900 0, meta);
901 long byteBufferExpectedSize =
902 ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
903 + HConstants.HFILEBLOCK_HEADER_SIZE + size);
904 long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
905 long hfileBlockExpectedSize =
906 ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
907 long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
908 assertEquals("Block data size: " + size + ", byte buffer expected " +
909 "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
910 "size: " + hfileBlockExpectedSize + ";", expected,
911 block.heapSize());
912 }
913 }
914 }