1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.DataInput;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.nio.ByteBuffer;
26
27 import org.apache.commons.io.IOUtils;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configurable;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.CellScanner;
33 import org.apache.hadoop.hbase.HBaseIOException;
34 import org.apache.hadoop.hbase.codec.Codec;
35 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
36 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
37 import org.apache.hadoop.hbase.io.HeapSize;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.hbase.util.ClassSize;
40 import org.apache.hadoop.io.compress.CodecPool;
41 import org.apache.hadoop.io.compress.CompressionCodec;
42 import org.apache.hadoop.io.compress.CompressionInputStream;
43 import org.apache.hadoop.io.compress.Compressor;
44 import org.apache.hadoop.io.compress.Decompressor;
45
46 import com.google.common.base.Preconditions;
47 import com.google.protobuf.CodedOutputStream;
48 import com.google.protobuf.Message;
49
50
51
52
53 class IPCUtil {
54 public static final Log LOG = LogFactory.getLog(IPCUtil.class);
55
56
57
58 private final int cellBlockDecompressionMultiplier;
59 private final int cellBlockBuildingInitialBufferSize;
60 private final Configuration conf;
61
62 IPCUtil(final Configuration conf) {
63 super();
64 this.conf = conf;
65 this.cellBlockDecompressionMultiplier =
66 conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
67
68
69
70 this.cellBlockBuildingInitialBufferSize =
71 ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
72 }
73
74
75
76
77 public static class CellScannerButNoCodecException extends HBaseIOException {};
78
79
80
81
82
83
84
85
86
87
88
89
90 @SuppressWarnings("resource")
91 ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
92 final CellScanner cellScanner)
93 throws IOException {
94 return buildCellBlock(codec, compressor, cellScanner, null);
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 @SuppressWarnings("resource")
113 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
114 final CellScanner cellScanner, final BoundedByteBufferPool pool)
115 throws IOException {
116 if (cellScanner == null) return null;
117 if (codec == null) throw new CellScannerButNoCodecException();
118 int bufferSize = this.cellBlockBuildingInitialBufferSize;
119 ByteBufferOutputStream baos = null;
120 if (pool != null) {
121 ByteBuffer bb = pool.getBuffer();
122 bufferSize = bb.capacity();
123 baos = new ByteBufferOutputStream(bb);
124 } else {
125
126 if (cellScanner instanceof HeapSize) {
127 long longSize = ((HeapSize)cellScanner).heapSize();
128
129 if (longSize > Integer.MAX_VALUE) {
130 throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
131 }
132 bufferSize = ClassSize.align((int)longSize);
133 }
134 baos = new ByteBufferOutputStream(bufferSize);
135 }
136 OutputStream os = baos;
137 Compressor poolCompressor = null;
138 try {
139 if (compressor != null) {
140 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
141 poolCompressor = CodecPool.getCompressor(compressor);
142 os = compressor.createOutputStream(os, poolCompressor);
143 }
144 Codec.Encoder encoder = codec.getEncoder(os);
145 int count = 0;
146 while (cellScanner.advance()) {
147 encoder.write(cellScanner.current());
148 count++;
149 }
150 encoder.flush();
151
152
153 if (count == 0) return null;
154 } finally {
155 os.close();
156 if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
157 }
158 if (LOG.isTraceEnabled()) {
159 if (bufferSize < baos.size()) {
160 LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
161 "; up hbase.ipc.cellblock.building.initial.buffersize?");
162 }
163 }
164 return baos.getByteBuffer();
165 }
166
167
168
169
170
171
172
173 CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
174 final byte [] cellBlock)
175 throws IOException {
176 return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
177 }
178
179
180
181
182
183
184
185
186
187 CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
188 final byte [] cellBlock, final int offset, final int length)
189 throws IOException {
190
191
192 InputStream is = null;
193 if (compressor != null) {
194
195 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
196 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
197 CompressionInputStream cis =
198 compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
199 poolDecompressor);
200 ByteBufferOutputStream bbos = null;
201 try {
202
203
204 bbos = new ByteBufferOutputStream((length - offset) *
205 this.cellBlockDecompressionMultiplier);
206 IOUtils.copy(cis, bbos);
207 bbos.close();
208 ByteBuffer bb = bbos.getByteBuffer();
209 is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
210 } finally {
211 if (is != null) is.close();
212 if (bbos != null) bbos.close();
213
214 CodecPool.returnDecompressor(poolDecompressor);
215 }
216 } else {
217 is = new ByteArrayInputStream(cellBlock, offset, length);
218 }
219 return codec.getDecoder(is);
220 }
221
222
223
224
225
226
227
228 static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
229 if (m == null) return null;
230 int serializedSize = m.getSerializedSize();
231 int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
232 byte [] buffer = new byte[serializedSize + vintSize];
233
234 CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
235
236 cos.writeMessageNoTag(m);
237 cos.flush();
238 cos.checkNoSpaceLeft();
239 return ByteBuffer.wrap(buffer);
240 }
241
242
243
244
245
246
247
248
249
250
251 static int write(final OutputStream dos, final Message header, final Message param,
252 final ByteBuffer cellBlock)
253 throws IOException {
254
255
256
257 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
258 if (cellBlock != null) totalSize += cellBlock.remaining();
259 return write(dos, header, param, cellBlock, totalSize);
260 }
261
262 private static int write(final OutputStream dos, final Message header, final Message param,
263 final ByteBuffer cellBlock, final int totalSize)
264 throws IOException {
265
266 dos.write(Bytes.toBytes(totalSize));
267
268 header.writeDelimitedTo(dos);
269 if (param != null) param.writeDelimitedTo(dos);
270 if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
271 dos.flush();
272 return totalSize;
273 }
274
275
276
277
278
279
280
281
282
283 static void readChunked(final DataInput in, byte[] dest, int offset, int len)
284 throws IOException {
285 int maxRead = 8192;
286
287 for (; offset < len; offset += maxRead) {
288 in.readFully(dest, offset, Math.min(len - offset, maxRead));
289 }
290 }
291
292
293
294
295
296
297 static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
298 int totalSize = 0;
299 for (Message m: messages) {
300 if (m == null) continue;
301 totalSize += m.getSerializedSize();
302 totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
303 }
304 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
305 return totalSize;
306 }
307 }