View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.KeyValue;
27  import org.apache.hadoop.hbase.KeyValue.KVComparator;
28  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
29  import org.apache.hadoop.hbase.io.TagCompressionContext;
30  import org.apache.hadoop.hbase.io.hfile.BlockType;
31  import org.apache.hadoop.hbase.io.hfile.HFileContext;
32  import org.apache.hadoop.hbase.io.util.LRUDictionary;
33  import org.apache.hadoop.hbase.util.ByteBufferUtils;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.io.WritableUtils;
36  
37  /**
38   * Base class for all data block encoders that use a buffer.
39   */
40  @InterfaceAudience.Private
41  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
42  
43    private static int INITIAL_KEY_BUFFER_SIZE = 512;
44  
45    @Override
46    public ByteBuffer decodeKeyValues(DataInputStream source,
47        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
48      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
49        throw new IOException(this.getClass().getName() + " only accepts "
50            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
51      }
52  
53      HFileBlockDefaultDecodingContext decodingCtx =
54          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
55      if (decodingCtx.getHFileContext().isIncludesTags()
56          && decodingCtx.getHFileContext().isCompressTags()) {
57        if (decodingCtx.getTagCompressionContext() != null) {
58          // It will be overhead to create the TagCompressionContext again and again for every block
59          // decoding.
60          decodingCtx.getTagCompressionContext().clear();
61        } else {
62          try {
63            TagCompressionContext tagCompressionContext = new TagCompressionContext(
64                LRUDictionary.class, Byte.MAX_VALUE);
65            decodingCtx.setTagCompressionContext(tagCompressionContext);
66          } catch (Exception e) {
67            throw new IOException("Failed to initialize TagCompressionContext", e);
68          }
69        }
70      }
71      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
72    }
73  
74    protected static class SeekerState {
75      protected int valueOffset = -1;
76      protected int keyLength;
77      protected int valueLength;
78      protected int lastCommonPrefix;
79      protected int tagsLength = 0;
80      protected int tagsOffset = -1;
81      protected int tagsCompressedLength = 0;
82      protected boolean uncompressTags = true;
83  
84      /** We need to store a copy of the key. */
85      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
86      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
87  
88      protected long memstoreTS;
89      protected int nextKvOffset;
90  
91      protected boolean isValid() {
92        return valueOffset != -1;
93      }
94  
95      protected void invalidate() {
96        valueOffset = -1;
97        tagsCompressedLength = 0;
98        uncompressTags = true;
99      }
100 
101     protected void ensureSpaceForKey() {
102       if (keyLength > keyBuffer.length) {
103         // rare case, but we need to handle arbitrary length of key
104         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
105         while (keyLength > newKeyBufferLength) {
106           newKeyBufferLength *= 2;
107         }
108         byte[] newKeyBuffer = new byte[newKeyBufferLength];
109         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
110         keyBuffer = newKeyBuffer;
111       }
112     }
113 
114     protected void ensureSpaceForTags() {
115       if (tagsLength > tagsBuffer.length) {
116         // rare case, but we need to handle arbitrary length of tags
117         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
118         while (tagsLength > newTagsBufferLength) {
119           newTagsBufferLength *= 2;
120         }
121         byte[] newTagsBuffer = new byte[newTagsBufferLength];
122         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
123         tagsBuffer = newTagsBuffer;
124       }
125     }
126 
127     /**
128      * Copy the state from the next one into this instance (the previous state
129      * placeholder). Used to save the previous state when we are advancing the
130      * seeker to the next key/value.
131      */
132     protected void copyFromNext(SeekerState nextState) {
133       if (keyBuffer.length != nextState.keyBuffer.length) {
134         keyBuffer = nextState.keyBuffer.clone();
135       } else if (!isValid()) {
136         // Note: we can only call isValid before we override our state, so this
137         // comes before all the assignments at the end of this method.
138         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
139              nextState.keyLength);
140       } else {
141         // don't copy the common prefix between this key and the previous one
142         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
143             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
144                 - nextState.lastCommonPrefix);
145       }
146 
147       valueOffset = nextState.valueOffset;
148       keyLength = nextState.keyLength;
149       valueLength = nextState.valueLength;
150       lastCommonPrefix = nextState.lastCommonPrefix;
151       nextKvOffset = nextState.nextKvOffset;
152       memstoreTS = nextState.memstoreTS;
153     }
154 
155   }
156 
157   protected abstract static class
158       BufferedEncodedSeeker<STATE extends SeekerState>
159       implements EncodedSeeker {
160     protected HFileBlockDecodingContext decodingCtx;
161     protected final KVComparator comparator;
162     protected final SamePrefixComparator<byte[]> samePrefixComparator;
163     protected ByteBuffer currentBuffer;
164     protected STATE current = createSeekerState(); // always valid
165     protected STATE previous = createSeekerState(); // may not be valid
166     protected TagCompressionContext tagCompressionContext = null;
167 
168     public BufferedEncodedSeeker(KVComparator comparator,
169         HFileBlockDecodingContext decodingCtx) {
170       this.comparator = comparator;
171       this.samePrefixComparator = comparator;
172       this.decodingCtx = decodingCtx;
173       if (decodingCtx.getHFileContext().isCompressTags()) {
174         try {
175           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
176         } catch (Exception e) {
177           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
178         }
179       }
180     }
181     
182     protected boolean includesMvcc() {
183       return this.decodingCtx.getHFileContext().isIncludesMvcc();
184     }
185 
186     protected boolean includesTags() {
187       return this.decodingCtx.getHFileContext().isIncludesTags();
188     }
189 
190     @Override
191     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
192       return comparator.compareFlatKey(key, offset, length,
193           current.keyBuffer, 0, current.keyLength);
194     }
195 
196     @Override
197     public void setCurrentBuffer(ByteBuffer buffer) {
198       if (this.tagCompressionContext != null) {
199         this.tagCompressionContext.clear();
200       }
201       currentBuffer = buffer;
202       decodeFirst();
203       previous.invalidate();
204     }
205 
206     @Override
207     public ByteBuffer getKeyDeepCopy() {
208       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
209       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
210       return keyBuffer;
211     }
212 
213     @Override
214     public ByteBuffer getValueShallowCopy() {
215       return ByteBuffer.wrap(currentBuffer.array(),
216           currentBuffer.arrayOffset() + current.valueOffset,
217           current.valueLength);
218     }
219 
220     @Override
221     public ByteBuffer getKeyValueBuffer() {
222       ByteBuffer kvBuffer = createKVBuffer();
223       kvBuffer.putInt(current.keyLength);
224       kvBuffer.putInt(current.valueLength);
225       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
226       kvBuffer.put(currentBuffer.array(),
227           currentBuffer.arrayOffset() + current.valueOffset,
228           current.valueLength);
229       if (current.tagsLength > 0) {
230         // Put short as unsigned
231         kvBuffer.put((byte)(current.tagsLength >> 8 & 0xff));
232         kvBuffer.put((byte)(current.tagsLength & 0xff));
233         if (current.tagsOffset != -1) {
234           // the offset of the tags bytes in the underlying buffer is marked. So the temp
235           // buffer,tagsBuffer was not been used.
236           kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
237               current.tagsLength);
238         } else {
239           // When tagsOffset is marked as -1, tag compression was present and so the tags were
240           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
241           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
242         }
243       }
244       return kvBuffer;
245     }
246 
247     protected ByteBuffer createKVBuffer() {
248       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
249           current.valueLength, current.tagsLength);
250       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
251       return kvBuffer;
252     }
253 
254     @Override
255     public KeyValue getKeyValue() {
256       ByteBuffer kvBuf = getKeyValueBuffer();
257       KeyValue kv = new KeyValue(kvBuf.array(), kvBuf.arrayOffset(), kvBuf.array().length
258           - kvBuf.arrayOffset());
259       kv.setMvccVersion(current.memstoreTS);
260       return kv;
261     }
262 
263     @Override
264     public void rewind() {
265       currentBuffer.rewind();
266       if (tagCompressionContext != null) {
267         tagCompressionContext.clear();
268       }
269       decodeFirst();
270       previous.invalidate();
271     }
272 
273     @Override
274     public boolean next() {
275       if (!currentBuffer.hasRemaining()) {
276         return false;
277       }
278       decodeNext();
279       previous.invalidate();
280       return true;
281     }
282 
283     protected void decodeTags() {
284       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
285       if (tagCompressionContext != null) {
286         if (current.uncompressTags) {
287           // Tag compression is been used. uncompress it into tagsBuffer
288           current.ensureSpaceForTags();
289           try {
290             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
291                 current.tagsBuffer, 0, current.tagsLength);
292           } catch (IOException e) {
293             throw new RuntimeException("Exception while uncompressing tags", e);
294           }
295         } else {
296           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
297           current.uncompressTags = true;// Reset this.
298         }
299         current.tagsOffset = -1;
300       } else {
301         // When tag compress is not used, let us not do temp copying of tags bytes into tagsBuffer.
302         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
303         current.tagsOffset = currentBuffer.position();
304         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
305       }
306     }
307 
308     @Override
309     public int seekToKeyInBlock(byte[] key, int offset, int length,
310         boolean seekBefore) {
311       int commonPrefix = 0;
312       previous.invalidate();
313       do {
314         int comp;
315         if (samePrefixComparator != null) {
316           commonPrefix = Math.min(commonPrefix, current.lastCommonPrefix);
317 
318           // extend commonPrefix
319           commonPrefix += ByteBufferUtils.findCommonPrefix(
320               key, offset + commonPrefix, length - commonPrefix,
321               current.keyBuffer, commonPrefix,
322               current.keyLength - commonPrefix);
323 
324           comp = samePrefixComparator.compareIgnoringPrefix(commonPrefix, key,
325               offset, length, current.keyBuffer, 0, current.keyLength);
326         } else {
327           comp = comparator.compareFlatKey(key, offset, length,
328               current.keyBuffer, 0, current.keyLength);
329         }
330 
331         if (comp == 0) { // exact match
332           if (seekBefore) {
333             if (!previous.isValid()) {
334               // The caller (seekBefore) has to ensure that we are not at the
335               // first key in the block.
336               throw new IllegalStateException("Cannot seekBefore if " +
337                   "positioned at the first key in the block: key=" +
338                   Bytes.toStringBinary(key, offset, length));
339             }
340             moveToPrevious();
341             return 1;
342           }
343           return 0;
344         }
345 
346         if (comp < 0) { // already too large, check previous
347           if (previous.isValid()) {
348             moveToPrevious();
349           } else {
350             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
351           }
352           return 1;
353         }
354 
355         // move to next, if more data is available
356         if (currentBuffer.hasRemaining()) {
357           previous.copyFromNext(current);
358           decodeNext();
359         } else {
360           break;
361         }
362       } while (true);
363 
364       // we hit the end of the block, not an exact match
365       return 1;
366     }
367 
368     private void moveToPrevious() {
369       if (!previous.isValid()) {
370         throw new IllegalStateException(
371             "Can move back only once and not in first key in the block.");
372       }
373 
374       STATE tmp = previous;
375       previous = current;
376       current = tmp;
377 
378       // move after last key value
379       currentBuffer.position(current.nextKvOffset);
380       // Already decoded the tag bytes. We cache this tags into current state and also the total
381       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
382       // the tags again. This might pollute the Data Dictionary what we use for the compression.
383       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
384       // 'tagsCompressedLength' bytes of source stream.
385       // See in decodeTags()
386       current.tagsBuffer = previous.tagsBuffer;
387       current.tagsCompressedLength = previous.tagsCompressedLength;
388       current.uncompressTags = false;
389       previous.invalidate();
390     }
391 
392     @SuppressWarnings("unchecked")
393     protected STATE createSeekerState() {
394       // This will fail for non-default seeker state if the subclass does not
395       // override this method.
396       return (STATE) new SeekerState();
397     }
398 
399     abstract protected void decodeFirst();
400     abstract protected void decodeNext();
401   }
402 
403   protected final void afterEncodingKeyValue(ByteBuffer in,
404       DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
405     if (encodingCtx.getHFileContext().isIncludesTags()) {
406       // Read short as unsigned, high byte first
407       int tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
408       ByteBufferUtils.putCompressedInt(out, tagsLength);
409       // There are some tags to be written
410       if (tagsLength > 0) {
411         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
412         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
413         // the tags using Dictionary compression in such a case
414         if (tagCompressionContext != null) {
415           tagCompressionContext.compressTags(out, in, tagsLength);
416         } else {
417           ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
418         }
419       }
420     }
421     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
422       // Copy memstore timestamp from the byte buffer to the output stream.
423       long memstoreTS = -1;
424       try {
425         memstoreTS = ByteBufferUtils.readVLong(in);
426         WritableUtils.writeVLong(out, memstoreTS);
427       } catch (IOException ex) {
428         throw new RuntimeException("Unable to copy memstore timestamp " +
429             memstoreTS + " after encoding a key/value");
430       }
431     }
432   }
433 
434   protected final void afterDecodingKeyValue(DataInputStream source,
435       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
436     if (decodingCtx.getHFileContext().isIncludesTags()) {
437       int tagsLength = ByteBufferUtils.readCompressedInt(source);
438       // Put as unsigned short
439       dest.put((byte)((tagsLength >> 8) & 0xff));
440       dest.put((byte)(tagsLength & 0xff));
441       if (tagsLength > 0) {
442         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
443         // When tag compression is been used in this file, tagCompressionContext will have a not
444         // null value passed.
445         if (tagCompressionContext != null) {
446           tagCompressionContext.uncompressTags(source, dest, tagsLength);
447         } else {
448           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
449         }
450       }
451     }
452     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
453       long memstoreTS = -1;
454       try {
455         // Copy memstore timestamp from the data input stream to the byte
456         // buffer.
457         memstoreTS = WritableUtils.readVLong(source);
458         ByteBufferUtils.writeVLong(dest, memstoreTS);
459       } catch (IOException ex) {
460         throw new RuntimeException("Unable to copy memstore timestamp " +
461             memstoreTS + " after decoding a key/value");
462       }
463     }
464   }
465 
466   @Override
467   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
468       byte[] header, HFileContext meta) {
469     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
470   }
471 
472   @Override
473   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
474     return new HFileBlockDefaultDecodingContext(meta);
475   }
476 
477   /**
478    * Compress KeyValues and write them to output buffer.
479    * @param out Where to write compressed data.
480    * @param in Source of KeyValue for compression.
481    * @param encodingCtx use the Encoding ctx associated with the current block
482    * @throws IOException If there is an error writing to output stream.
483    */
484   public abstract void internalEncodeKeyValues(DataOutputStream out,
485       ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException;
486 
487   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
488       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
489       throws IOException;
490 
491   @Override
492   public void encodeKeyValues(ByteBuffer in,
493       HFileBlockEncodingContext blkEncodingCtx) throws IOException {
494     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
495       throw new IOException (this.getClass().getName() + " only accepts "
496           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
497           "encoding context.");
498     }
499 
500     HFileBlockDefaultEncodingContext encodingCtx =
501         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
502     encodingCtx.prepareEncoding();
503     DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
504     if (encodingCtx.getHFileContext().isIncludesTags()
505         && encodingCtx.getHFileContext().isCompressTags()) {
506       if (encodingCtx.getTagCompressionContext() != null) {
507         // It will be overhead to create the TagCompressionContext again and again for every block
508         // encoding.
509         encodingCtx.getTagCompressionContext().clear();
510       } else {
511         try {
512           TagCompressionContext tagCompressionContext = new TagCompressionContext(
513               LRUDictionary.class, Byte.MAX_VALUE);
514           encodingCtx.setTagCompressionContext(tagCompressionContext);
515         } catch (Exception e) {
516           throw new IOException("Failed to initialize TagCompressionContext", e);
517         }
518       }
519     }
520     internalEncodeKeyValues(dataOut, in, encodingCtx);
521     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
522       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
523     } else {
524       encodingCtx.postEncoding(BlockType.DATA);
525     }
526   }
527 
528   /**
529    * Asserts that there is at least the given amount of unfilled space
530    * remaining in the given buffer.
531    * @param out typically, the buffer we are writing to
532    * @param length the required space in the buffer
533    * @throws EncoderBufferTooSmallException If there are no enough bytes.
534    */
535   protected static void ensureSpace(ByteBuffer out, int length)
536       throws EncoderBufferTooSmallException {
537     if (out.position() + length > out.limit()) {
538       throw new EncoderBufferTooSmallException(
539           "Buffer position=" + out.position() +
540           ", buffer limit=" + out.limit() +
541           ", length to be written=" + length);
542     }
543   }
544 
545 }