View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.nio.ByteBuffer;
26  
27  import org.apache.commons.io.IOUtils;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configurable;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.CellScanner;
33  import org.apache.hadoop.hbase.HBaseIOException;
34  import org.apache.hadoop.hbase.codec.Codec;
35  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
36  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
37  import org.apache.hadoop.hbase.io.HeapSize;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.util.ClassSize;
40  import org.apache.hadoop.io.compress.CodecPool;
41  import org.apache.hadoop.io.compress.CompressionCodec;
42  import org.apache.hadoop.io.compress.CompressionInputStream;
43  import org.apache.hadoop.io.compress.Compressor;
44  import org.apache.hadoop.io.compress.Decompressor;
45  
46  import com.google.common.base.Preconditions;
47  import com.google.protobuf.CodedOutputStream;
48  import com.google.protobuf.Message;
49  
50  /**
51   * Utility to help ipc'ing.
52   */
53  class IPCUtil {
54    public static final Log LOG = LogFactory.getLog(IPCUtil.class);
55    /**
56     * How much we think the decompressor will expand the original compressed content.
57     */
58    private final int cellBlockDecompressionMultiplier;
59    private final int cellBlockBuildingInitialBufferSize;
60    private final Configuration conf;
61  
62    IPCUtil(final Configuration conf) {
63      super();
64      this.conf = conf;
65      this.cellBlockDecompressionMultiplier =
66          conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
67  
68      // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below in
69      // #buildCellBlock.
70      this.cellBlockBuildingInitialBufferSize =
71        ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
72    }
73  
74    /**
75     * Thrown if a cellscanner but no codec to encode it with.
76     */
77    public static class CellScannerButNoCodecException extends HBaseIOException {};
78  
79    /**
80     * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
81     * <code>compressor</code>.
82     * @param codec
83     * @param compressor
84     * @Param cellScanner
85     * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
86     * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
87     * flipped and is ready for reading.  Use limit to find total size.
88     * @throws IOException
89     */
90    @SuppressWarnings("resource")
91    ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
92      final CellScanner cellScanner)
93    throws IOException {
94      return buildCellBlock(codec, compressor, cellScanner, null);
95    }
96  
97    /**
98     * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
99     * <code>compressor</code>.
100    * @param codec
101    * @param compressor
102    * @param cellScanner
103    * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
104    * our own ByteBuffer.
105    * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
106    * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
107    * flipped and is ready for reading.  Use limit to find total size. If <code>pool</code> was not
108    * null, then this returned ByteBuffer came from there and should be returned to the pool when
109    * done.
110    * @throws IOException
111    */
112   @SuppressWarnings("resource")
113   public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
114     final CellScanner cellScanner, final BoundedByteBufferPool pool)
115   throws IOException {
116     if (cellScanner == null) return null;
117     if (codec == null) throw new CellScannerButNoCodecException();
118     int bufferSize = this.cellBlockBuildingInitialBufferSize;
119     ByteBufferOutputStream baos = null;
120     if (pool != null) {
121       ByteBuffer bb = pool.getBuffer();
122       bufferSize = bb.capacity();
123       baos = new ByteBufferOutputStream(bb);
124     } else {
125       // Then we need to make our own to return.
126       if (cellScanner instanceof HeapSize) {
127         long longSize = ((HeapSize)cellScanner).heapSize();
128         // Just make sure we don't have a size bigger than an int.
129         if (longSize > Integer.MAX_VALUE) {
130           throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
131         }
132         bufferSize = ClassSize.align((int)longSize);
133       }
134       baos = new ByteBufferOutputStream(bufferSize);
135     }
136     OutputStream os = baos;
137     Compressor poolCompressor = null;
138     try {
139       if (compressor != null) {
140         if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
141         poolCompressor = CodecPool.getCompressor(compressor);
142         os = compressor.createOutputStream(os, poolCompressor);
143       }
144       Codec.Encoder encoder = codec.getEncoder(os);
145       int count = 0;
146       while (cellScanner.advance()) {
147         encoder.write(cellScanner.current());
148         count++;
149       }
150       encoder.flush();
151       // If no cells, don't mess around.  Just return null (could be a bunch of existence checking
152       // gets or something -- stuff that does not return a cell).
153       if (count == 0) return null;
154     } finally {
155       os.close();
156       if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
157     }
158     if (LOG.isTraceEnabled()) {
159       if (bufferSize < baos.size()) {
160         LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
161           "; up hbase.ipc.cellblock.building.initial.buffersize?");
162       }
163     }
164     return baos.getByteBuffer();
165   }
166 
167   /**
168    * @param codec
169    * @param cellBlock
170    * @return CellScanner to work against the content of <code>cellBlock</code>
171    * @throws IOException
172    */
173   CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
174       final byte [] cellBlock)
175   throws IOException {
176     return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
177   }
178 
179   /**
180    * @param codec
181    * @param cellBlock
182    * @param offset
183    * @param length
184    * @return CellScanner to work against the content of <code>cellBlock</code>
185    * @throws IOException
186    */
187   CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
188       final byte [] cellBlock, final int offset, final int length)
189   throws IOException {
190     // If compressed, decompress it first before passing it on else we will leak compression
191     // resources if the stream is not closed properly after we let it out.
192     InputStream is = null;
193     if (compressor != null) {
194       // GZIPCodec fails w/ NPE if no configuration.
195       if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
196       Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
197       CompressionInputStream cis =
198         compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
199         poolDecompressor);
200       ByteBufferOutputStream bbos = null;
201       try {
202         // TODO: This is ugly.  The buffer will be resized on us if we guess wrong.
203         // TODO: Reuse buffers.
204         bbos = new ByteBufferOutputStream((length - offset) *
205           this.cellBlockDecompressionMultiplier);
206         IOUtils.copy(cis, bbos);
207         bbos.close();
208         ByteBuffer bb = bbos.getByteBuffer();
209         is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
210       } finally {
211         if (is != null) is.close();
212         if (bbos != null) bbos.close();
213 
214         CodecPool.returnDecompressor(poolDecompressor);
215       }
216     } else {
217       is = new ByteArrayInputStream(cellBlock, offset, length);
218     }
219     return codec.getDecoder(is);
220   }
221 
222   /**
223    * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
224    * serialization.
225    * @return The passed in Message serialized with delimiter.  Return null if <code>m</code> is null
226    * @throws IOException
227    */
228   static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
229     if (m == null) return null;
230     int serializedSize = m.getSerializedSize();
231     int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
232     byte [] buffer = new byte[serializedSize + vintSize];
233     // Passing in a byte array saves COS creating a buffer which it does when using streams.
234     CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
235     // This will write out the vint preamble and the message serialized.
236     cos.writeMessageNoTag(m);
237     cos.flush();
238     cos.checkNoSpaceLeft();
239     return ByteBuffer.wrap(buffer);
240   }
241 
242   /**
243    * Write out header, param, and cell block if there is one.
244    * @param dos
245    * @param header
246    * @param param
247    * @param cellBlock
248    * @return Total number of bytes written.
249    * @throws IOException
250    */
251   static int write(final OutputStream dos, final Message header, final Message param,
252       final ByteBuffer cellBlock)
253   throws IOException {
254     // Must calculate total size and write that first so other side can read it all in in one
255     // swoop.  This is dictated by how the server is currently written.  Server needs to change
256     // if we are to be able to write without the length prefixing.
257     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
258     if (cellBlock != null) totalSize += cellBlock.remaining();
259     return write(dos, header, param, cellBlock, totalSize);
260   }
261 
262   private static int write(final OutputStream dos, final Message header, final Message param,
263     final ByteBuffer cellBlock, final int totalSize)
264   throws IOException {
265     // I confirmed toBytes does same as DataOutputStream#writeInt.
266     dos.write(Bytes.toBytes(totalSize));
267     // This allocates a buffer that is the size of the message internally.
268     header.writeDelimitedTo(dos);
269     if (param != null) param.writeDelimitedTo(dos);
270     if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
271     dos.flush();
272     return totalSize;
273   }
274 
275   /**
276    * Read in chunks of 8K (HBASE-7239)
277    * @param in
278    * @param dest
279    * @param offset
280    * @param len
281    * @throws IOException
282    */
283   static void readChunked(final DataInput in, byte[] dest, int offset, int len)
284       throws IOException {
285     int maxRead = 8192;
286 
287     for (; offset < len; offset += maxRead) {
288       in.readFully(dest, offset, Math.min(len - offset, maxRead));
289     }
290   }
291 
292   /**
293    * @param header
294    * @param body
295    * @return Size on the wire when the two messages are written with writeDelimitedTo
296    */
297   static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
298     int totalSize = 0;
299     for (Message m: messages) {
300       if (m == null) continue;
301       totalSize += m.getSerializedSize();
302       totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
303     }
304     Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
305     return totalSize;
306   }
307 }