View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23  import static org.junit.Assert.*;
24  
25  import java.io.ByteArrayOutputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.io.OutputStream;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.List;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.fs.FSDataInputStream;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.fs.HFileSystem;
42  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
43  import org.apache.hadoop.hbase.io.compress.Compression;
44  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
45  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
46  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
47  import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
48  import org.apache.hadoop.hbase.testclassification.SmallTests;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.ChecksumType;
51  import org.apache.hadoop.io.compress.Compressor;
52  import org.junit.Before;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  import org.junit.runner.RunWith;
56  import org.junit.runners.Parameterized;
57  import org.junit.runners.Parameterized.Parameters;
58  
59  import com.google.common.base.Preconditions;
60  
61  /**
62   * This class has unit tests to prove that older versions of
63   * HFiles (without checksums) are compatible with current readers.
64   */
65  @Category(SmallTests.class)
66  @RunWith(Parameterized.class)
67  public class TestHFileBlockCompatibility {
68    // change this value to activate more logs
69    private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
70  
71    private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
72  
73    private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
74        NONE, GZ };
75  
76    // The mnior version for pre-checksum files
77    private static int MINOR_VERSION = 0;
78  
79    private static final HBaseTestingUtility TEST_UTIL =
80      new HBaseTestingUtility();
81    private HFileSystem fs;
82    private int uncompressedSizeV1;
83  
84    private final boolean includesMemstoreTS;
85    private final boolean includesTag;
86  
87    public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) {
88      this.includesMemstoreTS = includesMemstoreTS;
89      this.includesTag = includesTag;
90    }
91  
92    @Parameters
93    public static Collection<Object[]> parameters() {
94      return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
95    }
96  
97    @Before
98    public void setUp() throws IOException {
99      fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
100   }
101 
102   public byte[] createTestV1Block(Compression.Algorithm algo)
103       throws IOException {
104     Compressor compressor = algo.getCompressor();
105     ByteArrayOutputStream baos = new ByteArrayOutputStream();
106     OutputStream os = algo.createCompressionStream(baos, compressor, 0);
107     DataOutputStream dos = new DataOutputStream(os);
108     BlockType.META.write(dos); // Let's make this a meta block.
109     TestHFileBlock.writeTestBlockContents(dos);
110     uncompressedSizeV1 = dos.size();
111     dos.flush();
112     algo.returnCompressor(compressor);
113     return baos.toByteArray();
114   }
115 
116   private Writer createTestV2Block(Compression.Algorithm algo)
117       throws IOException {
118     final BlockType blockType = BlockType.DATA;
119     Writer hbw = new Writer(algo, null,
120         includesMemstoreTS, includesTag);
121     DataOutputStream dos = hbw.startWriting(blockType);
122     TestHFileBlock.writeTestBlockContents(dos);
123     // make sure the block is ready by calling hbw.getHeaderAndData()
124     hbw.getHeaderAndData();
125     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
126     hbw.releaseCompressor();
127     return hbw;
128   }
129 
130  private String createTestBlockStr(Compression.Algorithm algo,
131       int correctLength) throws IOException {
132     Writer hbw = createTestV2Block(algo);
133     byte[] testV2Block = hbw.getHeaderAndData();
134     int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9;
135     if (testV2Block.length == correctLength) {
136       // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
137       // variations across operating systems.
138       // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
139       testV2Block[osOffset] = 3;
140     }
141     return Bytes.toStringBinary(testV2Block);
142   }
143 
144   @Test
145   public void testNoCompression() throws IOException {
146     assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
147         getUncompressedSizeWithoutHeader());
148   }
149 
150   @Test
151   public void testGzipCompression() throws IOException {
152     final String correctTestBlockStr =
153         "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
154             + "\\xFF\\xFF\\xFF\\xFF"
155             // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
156             + "\\x1F\\x8B"  // gzip magic signature
157             + "\\x08"  // Compression method: 8 = "deflate"
158             + "\\x00"  // Flags
159             + "\\x00\\x00\\x00\\x00"  // mtime
160             + "\\x00"  // XFL (extra flags)
161             // OS (0 = FAT filesystems, 3 = Unix). However, this field
162             // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
163             + "\\x03"
164             + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
165             + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
166             + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00";
167     final int correctGzipBlockLength = 82;
168 
169     String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
170     assertEquals(correctTestBlockStr, returnedStr);
171   }
172 
173   @Test
174   public void testReaderV2() throws IOException {
175     if(includesTag) {
176       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
177     }
178     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
179       for (boolean pread : new boolean[] { false, true }) {
180           LOG.info("testReaderV2: Compression algorithm: " + algo +
181                    ", pread=" + pread);
182         Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
183             + algo);
184         FSDataOutputStream os = fs.create(path);
185         Writer hbw = new Writer(algo, null,
186             includesMemstoreTS, includesTag);
187         long totalSize = 0;
188         for (int blockId = 0; blockId < 2; ++blockId) {
189           DataOutputStream dos = hbw.startWriting(BlockType.DATA);
190           for (int i = 0; i < 1234; ++i)
191             dos.writeInt(i);
192           hbw.writeHeaderAndData(os);
193           totalSize += hbw.getOnDiskSizeWithHeader();
194         }
195         os.close();
196 
197         FSDataInputStream is = fs.open(path);
198         HFileContext meta = new HFileContextBuilder()
199                            .withHBaseCheckSum(false)
200                            .withIncludesMvcc(includesMemstoreTS)
201                            .withIncludesTags(includesTag)
202                            .withCompression(algo)
203                            .build();
204         HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
205             totalSize, fs, path, meta);
206         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
207         is.close();
208 
209         b.sanityCheck();
210         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
211         assertEquals(algo == GZ ? 2173 : 4936,
212                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
213         HFileBlock expected = b;
214 
215         if (algo == GZ) {
216           is = fs.open(path);
217           hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is), totalSize, fs, path,
218               meta);
219           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
220                                 b.totalChecksumBytes(), -1, pread);
221           assertEquals(expected, b);
222           int wrongCompressedSize = 2172;
223           try {
224             b = hbr.readBlockData(0, wrongCompressedSize
225                 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread);
226             fail("Exception expected");
227           } catch (IOException ex) {
228             String expectedPrefix = "On-disk size without header provided is "
229                 + wrongCompressedSize + ", but block header contains "
230                 + b.getOnDiskSizeWithoutHeader() + ".";
231             assertTrue("Invalid exception message: '" + ex.getMessage()
232                 + "'.\nMessage is expected to start with: '" + expectedPrefix
233                 + "'", ex.getMessage().startsWith(expectedPrefix));
234           }
235           is.close();
236         }
237       }
238     }
239   }
240 
241   /**
242    * Test encoding/decoding data blocks.
243    * @throws IOException a bug or a problem with temporary files.
244    */
245   @Test
246   public void testDataBlockEncoding() throws IOException {
247     if(includesTag) {
248       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
249     }
250     final int numBlocks = 5;
251     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
252       for (boolean pread : new boolean[] { false, true }) {
253         for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
254           LOG.info("testDataBlockEncoding algo " + algo +
255                    " pread = " + pread +
256                    " encoding " + encoding);
257           Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
258               + algo + "_" + encoding.toString());
259           FSDataOutputStream os = fs.create(path);
260           HFileDataBlockEncoder dataBlockEncoder =
261               new HFileDataBlockEncoderImpl(encoding);
262           TestHFileBlockCompatibility.Writer hbw =
263               new TestHFileBlockCompatibility.Writer(algo,
264                   dataBlockEncoder, includesMemstoreTS, includesTag);
265           long totalSize = 0;
266           final List<Integer> encodedSizes = new ArrayList<Integer>();
267           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
268           for (int blockId = 0; blockId < numBlocks; ++blockId) {
269             DataOutputStream dos = hbw.startWriting(BlockType.DATA);
270             TestHFileBlock.writeEncodedBlock(algo, encoding, dos, encodedSizes,
271                 encodedBlocks, blockId, includesMemstoreTS,
272                 TestHFileBlockCompatibility.Writer.DUMMY_HEADER, includesTag);
273 
274             hbw.writeHeaderAndData(os);
275             totalSize += hbw.getOnDiskSizeWithHeader();
276           }
277           os.close();
278 
279           FSDataInputStream is = fs.open(path);
280           HFileContext meta = new HFileContextBuilder()
281                               .withHBaseCheckSum(false)
282                               .withIncludesMvcc(includesMemstoreTS)
283                               .withIncludesTags(includesTag)
284                               .withCompression(algo)
285                               .build();
286           HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
287               totalSize, fs, path, meta);
288           hbr.setDataBlockEncoder(dataBlockEncoder);
289           hbr.setIncludesMemstoreTS(includesMemstoreTS);
290 
291           HFileBlock b;
292           int pos = 0;
293           for (int blockId = 0; blockId < numBlocks; ++blockId) {
294             b = hbr.readBlockData(pos, -1, -1, pread);
295             b.sanityCheck();
296             if (meta.isCompressedOrEncrypted()) {
297               assertFalse(b.isUnpacked());
298               b = b.unpack(meta, hbr);
299             }
300             pos += b.getOnDiskSizeWithHeader();
301 
302             assertEquals((int) encodedSizes.get(blockId),
303                 b.getUncompressedSizeWithoutHeader());
304             ByteBuffer actualBuffer = b.getBufferWithoutHeader();
305             if (encoding != DataBlockEncoding.NONE) {
306               // We expect a two-byte big-endian encoding id.
307               assertEquals(0, actualBuffer.get(0));
308               assertEquals(encoding.getId(), actualBuffer.get(1));
309               actualBuffer.position(2);
310               actualBuffer = actualBuffer.slice();
311             }
312 
313             ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
314             expectedBuffer.rewind();
315 
316             // test if content matches, produce nice message
317             TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer,
318               algo, encoding, pread);
319           }
320           is.close();
321         }
322       }
323     }
324   }
325   /**
326    * This is the version of the HFileBlock.Writer that is used to
327    * create V2 blocks with minor version 0. These blocks do not
328    * have hbase-level checksums. The code is here to test
329    * backward compatibility. The reason we do not inherit from
330    * HFileBlock.Writer is because we never ever want to change the code
331    * in this class but the code in HFileBlock.Writer will continually
332    * evolve.
333    */
334   public static final class Writer {
335 
336     // These constants are as they were in minorVersion 0.
337     private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
338     private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
339     private static final byte[] DUMMY_HEADER =
340       HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
341 
342     private enum State {
343       INIT,
344       WRITING,
345       BLOCK_READY
346     };
347 
348     /** Writer state. Used to ensure the correct usage protocol. */
349     private State state = State.INIT;
350 
351     /** Compression algorithm for all blocks this instance writes. */
352     private final Compression.Algorithm compressAlgo;
353 
354     /** Data block encoder used for data blocks */
355     private final HFileDataBlockEncoder dataBlockEncoder;
356 
357     private HFileBlockEncodingContext dataBlockEncodingCtx;
358     /** block encoding context for non-data blocks */
359     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
360 
361     /**
362      * The stream we use to accumulate data in uncompressed format for each
363      * block. We reset this stream at the end of each block and reuse it. The
364      * header is written as the first {@link #HEADER_SIZE} bytes into this
365      * stream.
366      */
367     private ByteArrayOutputStream baosInMemory;
368 
369     /** Compressor, which is also reused between consecutive blocks. */
370     private Compressor compressor;
371 
372     /**
373      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
374      * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
375      * to {@link BlockType#ENCODED_DATA}.
376      */
377     private BlockType blockType;
378 
379     /**
380      * A stream that we write uncompressed bytes to, which compresses them and
381      * writes them to {@link #baosInMemory}.
382      */
383     private DataOutputStream userDataStream;
384 
385     /**
386      * Bytes to be written to the file system, including the header. Compressed
387      * if compression is turned on.
388      */
389     private byte[] onDiskBytesWithHeader;
390 
391     /**
392      * Valid in the READY state. Contains the header and the uncompressed (but
393      * potentially encoded, if this is a data block) bytes, so the length is
394      * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
395      */
396     private byte[] uncompressedBytesWithHeader;
397 
398     /**
399      * Current block's start offset in the {@link HFile}. Set in
400      * {@link #writeHeaderAndData(FSDataOutputStream)}.
401      */
402     private long startOffset;
403 
404     /**
405      * Offset of previous block by block type. Updated when the next block is
406      * started.
407      */
408     private long[] prevOffsetByType;
409 
410     /** The offset of the previous block of the same type */
411     private long prevOffset;
412 
413     private HFileContext meta;
414 
415     public Writer(Compression.Algorithm compressionAlgorithm,
416           HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
417       compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
418       this.dataBlockEncoder = dataBlockEncoder != null
419           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
420 
421       meta = new HFileContextBuilder()
422               .withHBaseCheckSum(false)
423               .withIncludesMvcc(includesMemstoreTS)
424               .withIncludesTags(includesTag)
425               .withCompression(compressionAlgorithm)
426               .build();
427       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
428       dataBlockEncodingCtx =
429           this.dataBlockEncoder.newDataBlockEncodingContext(
430               DUMMY_HEADER, meta);
431       baosInMemory = new ByteArrayOutputStream();
432 
433       prevOffsetByType = new long[BlockType.values().length];
434       for (int i = 0; i < prevOffsetByType.length; ++i)
435         prevOffsetByType[i] = -1;
436 
437     }
438 
439     /**
440      * Starts writing into the block. The previous block's data is discarded.
441      *
442      * @return the stream the user can write their data into
443      * @throws IOException
444      */
445     public DataOutputStream startWriting(BlockType newBlockType)
446         throws IOException {
447       if (state == State.BLOCK_READY && startOffset != -1) {
448         // We had a previous block that was written to a stream at a specific
449         // offset. Save that offset as the last offset of a block of that type.
450         prevOffsetByType[blockType.getId()] = startOffset;
451       }
452 
453       startOffset = -1;
454       blockType = newBlockType;
455 
456       baosInMemory.reset();
457       baosInMemory.write(DUMMY_HEADER);
458 
459       state = State.WRITING;
460 
461       // We will compress it later in finishBlock()
462       userDataStream = new DataOutputStream(baosInMemory);
463       return userDataStream;
464     }
465 
466     /**
467      * Returns the stream for the user to write to. The block writer takes care
468      * of handling compression and buffering for caching on write. Can only be
469      * called in the "writing" state.
470      *
471      * @return the data output stream for the user to write to
472      */
473     DataOutputStream getUserDataStream() {
474       expectState(State.WRITING);
475       return userDataStream;
476     }
477 
478     /**
479      * Transitions the block writer from the "writing" state to the "block
480      * ready" state.  Does nothing if a block is already finished.
481      */
482     private void ensureBlockReady() throws IOException {
483       Preconditions.checkState(state != State.INIT,
484           "Unexpected state: " + state);
485 
486       if (state == State.BLOCK_READY)
487         return;
488 
489       // This will set state to BLOCK_READY.
490       finishBlock();
491     }
492 
493     /**
494      * An internal method that flushes the compressing stream (if using
495      * compression), serializes the header, and takes care of the separate
496      * uncompressed stream for caching on write, if applicable. Sets block
497      * write state to "block ready".
498      */
499     private void finishBlock() throws IOException {
500       userDataStream.flush();
501       // This does an array copy, so it is safe to cache this byte array.
502       uncompressedBytesWithHeader = baosInMemory.toByteArray();
503       prevOffset = prevOffsetByType[blockType.getId()];
504 
505       // We need to set state before we can package the block up for
506       // cache-on-write. In a way, the block is ready, but not yet encoded or
507       // compressed.
508       state = State.BLOCK_READY;
509       if (blockType == BlockType.DATA) {
510         encodeDataBlockForDisk();
511       } else {
512         defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
513             uncompressedBytesWithHeader, blockType);
514         onDiskBytesWithHeader =
515           defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
516       }
517 
518       // put the header for on disk bytes
519       putHeader(onDiskBytesWithHeader, 0,
520           onDiskBytesWithHeader.length,
521           uncompressedBytesWithHeader.length);
522       //set the header for the uncompressed bytes (for cache-on-write)
523       putHeader(uncompressedBytesWithHeader, 0,
524           onDiskBytesWithHeader.length,
525         uncompressedBytesWithHeader.length);
526     }
527 
528     /**
529      * Encodes this block if it is a data block and encoding is turned on in
530      * {@link #dataBlockEncoder}.
531      */
532     private void encodeDataBlockForDisk() throws IOException {
533       // do data block encoding, if data block encoder is set
534       ByteBuffer rawKeyValues =
535           ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
536               uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
537 
538       //do the encoding
539       dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
540 
541       uncompressedBytesWithHeader =
542           dataBlockEncodingCtx.getUncompressedBytesWithHeader();
543       onDiskBytesWithHeader =
544           dataBlockEncodingCtx.getOnDiskBytesWithHeader();
545       blockType = dataBlockEncodingCtx.getBlockType();
546     }
547 
548     /**
549      * Put the header into the given byte array at the given offset.
550      * @param onDiskSize size of the block on disk
551      * @param uncompressedSize size of the block after decompression (but
552      *          before optional data block decoding)
553      */
554     private void putHeader(byte[] dest, int offset, int onDiskSize,
555         int uncompressedSize) {
556       offset = blockType.put(dest, offset);
557       offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
558       offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
559       Bytes.putLong(dest, offset, prevOffset);
560     }
561 
562     /**
563      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
564      * the offset of this block so that it can be referenced in the next block
565      * of the same type.
566      *
567      * @param out
568      * @throws IOException
569      */
570     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
571       long offset = out.getPos();
572       if (startOffset != -1 && offset != startOffset) {
573         throw new IOException("A " + blockType + " block written to a "
574             + "stream twice, first at offset " + startOffset + ", then at "
575             + offset);
576       }
577       startOffset = offset;
578 
579       writeHeaderAndData((DataOutputStream) out);
580     }
581 
582     /**
583      * Writes the header and the compressed data of this block (or uncompressed
584      * data when not using compression) into the given stream. Can be called in
585      * the "writing" state or in the "block ready" state. If called in the
586      * "writing" state, transitions the writer to the "block ready" state.
587      *
588      * @param out the output stream to write the
589      * @throws IOException
590      */
591     private void writeHeaderAndData(DataOutputStream out) throws IOException {
592       ensureBlockReady();
593       out.write(onDiskBytesWithHeader);
594     }
595 
596     /**
597      * Returns the header or the compressed data (or uncompressed data when not
598      * using compression) as a byte array. Can be called in the "writing" state
599      * or in the "block ready" state. If called in the "writing" state,
600      * transitions the writer to the "block ready" state.
601      *
602      * @return header and data as they would be stored on disk in a byte array
603      * @throws IOException
604      */
605     public byte[] getHeaderAndData() throws IOException {
606       ensureBlockReady();
607       return onDiskBytesWithHeader;
608     }
609 
610     /**
611      * Releases the compressor this writer uses to compress blocks into the
612      * compressor pool. Needs to be called before the writer is discarded.
613      */
614     public void releaseCompressor() {
615       if (compressor != null) {
616         compressAlgo.returnCompressor(compressor);
617         compressor = null;
618       }
619     }
620 
621     /**
622      * Returns the on-disk size of the data portion of the block. This is the
623      * compressed size if compression is enabled. Can only be called in the
624      * "block ready" state. Header is not compressed, and its size is not
625      * included in the return value.
626      *
627      * @return the on-disk size of the block, not including the header.
628      */
629     public int getOnDiskSizeWithoutHeader() {
630       expectState(State.BLOCK_READY);
631       return onDiskBytesWithHeader.length - HEADER_SIZE;
632     }
633 
634     /**
635      * Returns the on-disk size of the block. Can only be called in the
636      * "block ready" state.
637      *
638      * @return the on-disk size of the block ready to be written, including the
639      *         header size
640      */
641     public int getOnDiskSizeWithHeader() {
642       expectState(State.BLOCK_READY);
643       return onDiskBytesWithHeader.length;
644     }
645 
646     /**
647      * The uncompressed size of the block data. Does not include header size.
648      */
649     public int getUncompressedSizeWithoutHeader() {
650       expectState(State.BLOCK_READY);
651       return uncompressedBytesWithHeader.length - HEADER_SIZE;
652     }
653 
654     /**
655      * The uncompressed size of the block data, including header size.
656      */
657     public int getUncompressedSizeWithHeader() {
658       expectState(State.BLOCK_READY);
659       return uncompressedBytesWithHeader.length;
660     }
661 
662     /** @return true if a block is being written  */
663     public boolean isWriting() {
664       return state == State.WRITING;
665     }
666 
667     /**
668      * Returns the number of bytes written into the current block so far, or
669      * zero if not writing the block at the moment. Note that this will return
670      * zero in the "block ready" state as well.
671      *
672      * @return the number of bytes written
673      */
674     public int blockSizeWritten() {
675       if (state != State.WRITING)
676         return 0;
677       return userDataStream.size();
678     }
679 
680     /**
681      * Returns the header followed by the uncompressed data, even if using
682      * compression. This is needed for storing uncompressed blocks in the block
683      * cache. Can be called in the "writing" state or the "block ready" state.
684      *
685      * @return uncompressed block bytes for caching on write
686      */
687     private byte[] getUncompressedDataWithHeader() {
688       expectState(State.BLOCK_READY);
689 
690       return uncompressedBytesWithHeader;
691     }
692 
693     private void expectState(State expectedState) {
694       if (state != expectedState) {
695         throw new IllegalStateException("Expected state: " + expectedState +
696             ", actual state: " + state);
697       }
698     }
699 
700     /**
701      * Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte
702      * buffer.
703      *
704      * @return uncompressed block for caching on write in the form of a buffer
705      */
706     public ByteBuffer getUncompressedBufferWithHeader() {
707       byte[] b = getUncompressedDataWithHeader();
708       return ByteBuffer.wrap(b, 0, b.length);
709     }
710 
711     /**
712      * Takes the given {@link BlockWritable} instance, creates a new block of
713      * its appropriate type, writes the writable into this block, and flushes
714      * the block into the output stream. The writer is instructed not to buffer
715      * uncompressed bytes for cache-on-write.
716      *
717      * @param bw the block-writable object to write as a block
718      * @param out the file system output stream
719      * @throws IOException
720      */
721     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
722         throws IOException {
723       bw.writeToBlock(startWriting(bw.getBlockType()));
724       writeHeaderAndData(out);
725     }
726 
727     /**
728      * Creates a new HFileBlock.
729      */
730     public HFileBlock getBlockForCaching() {
731       HFileContext meta = new HFileContextBuilder()
732              .withHBaseCheckSum(false)
733              .withChecksumType(ChecksumType.NULL)
734              .withBytesPerCheckSum(0)
735              .build();
736       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
737           getUncompressedSizeWithoutHeader(), prevOffset,
738           getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset, 
739           getOnDiskSizeWithoutHeader(), meta);
740     }
741   }
742 
743 }
744