1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.codec.BaseDecoder;
31 import org.apache.hadoop.hbase.codec.BaseEncoder;
32 import org.apache.hadoop.hbase.codec.Codec;
33 import org.apache.hadoop.hbase.codec.KeyValueCodec;
34 import org.apache.hadoop.hbase.io.util.Dictionary;
35 import org.apache.hadoop.hbase.io.util.StreamUtils;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.ReflectionUtils;
38 import org.apache.hadoop.io.IOUtils;
39
40 import com.google.protobuf.ByteString;
41
42
43
44
45
46
47
48
49
50 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
51 public class WALCellCodec implements Codec {
52
53 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
54
55 protected final CompressionContext compression;
56 protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
57 @Override
58 public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
59 return WALCellCodec.uncompressByteString(data, dict);
60 }
61 };
62
63
64
65
66 public WALCellCodec() {
67 this.compression = null;
68 }
69
70
71
72
73
74
75
76
77 public WALCellCodec(Configuration conf, CompressionContext compression) {
78 this.compression = compression;
79 }
80
81 static String getWALCellCodecClass(Configuration conf) {
82 return conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
83 }
84
85
86
87
88
89
90
91
92
93
94
95
96
97 public static WALCellCodec create(Configuration conf, String cellCodecClsName,
98 CompressionContext compression) throws UnsupportedOperationException {
99 if (cellCodecClsName == null) {
100 cellCodecClsName = getWALCellCodecClass(conf);
101 }
102 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
103 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117 public static WALCellCodec create(Configuration conf,
118 CompressionContext compression) throws UnsupportedOperationException {
119 String cellCodecClsName = getWALCellCodecClass(conf);
120 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
121 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
122 }
123
124 public interface ByteStringCompressor {
125 ByteString compress(byte[] data, Dictionary dict) throws IOException;
126 }
127
128 public interface ByteStringUncompressor {
129 byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
130 }
131
132
133
134
135 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
136 public ByteString toByteString() {
137 return ByteString.copyFrom(this.buf, 0, this.count);
138 }
139
140 @Override
141 public ByteString compress(byte[] data, Dictionary dict) throws IOException {
142 writeCompressed(data, dict);
143 ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
144 reset();
145 return result;
146 }
147
148 private void writeCompressed(byte[] data, Dictionary dict) throws IOException {
149 assert dict != null;
150 short dictIdx = dict.findEntry(data, 0, data.length);
151 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
152 write(Dictionary.NOT_IN_DICTIONARY);
153 StreamUtils.writeRawVInt32(this, data.length);
154 write(data, 0, data.length);
155 } else {
156 StreamUtils.writeShort(this, dictIdx);
157 }
158 }
159 }
160
161 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
162 InputStream in = bs.newInput();
163 byte status = (byte)in.read();
164 if (status == Dictionary.NOT_IN_DICTIONARY) {
165 byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
166 int bytesRead = in.read(arr);
167 if (bytesRead != arr.length) {
168 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
169 }
170 if (dict != null) dict.addEntry(arr, 0, arr.length);
171 return arr;
172 } else {
173
174 short dictIdx = StreamUtils.toShort(status, (byte)in.read());
175 byte[] entry = dict.getEntry(dictIdx);
176 if (entry == null) {
177 throw new IOException("Missing dictionary entry for index " + dictIdx);
178 }
179 return entry;
180 }
181 }
182
183 static class CompressedKvEncoder extends BaseEncoder {
184 private final CompressionContext compression;
185 public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
186 super(out);
187 this.compression = compression;
188 }
189
190 @Override
191 public void write(Cell cell) throws IOException {
192 if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
193 KeyValue kv = (KeyValue)cell;
194 byte[] kvBuffer = kv.getBuffer();
195 int offset = kv.getOffset();
196
197
198 StreamUtils.writeRawVInt32(out, kv.getKeyLength());
199 StreamUtils.writeRawVInt32(out, kv.getValueLength());
200
201 int tagsLength = kv.getTagsLengthUnsigned();
202 StreamUtils.writeRawVInt32(out, tagsLength);
203
204
205
206 write(kvBuffer, kv.getRowOffset(), kv.getRowLength(), compression.rowDict);
207 write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
208 write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
209
210
211 int pos = kv.getTimestampOffset();
212 int tsTypeValLen = kv.getLength() + offset - pos;
213 if (tagsLength > 0) {
214 tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
215 }
216 assert tsTypeValLen > 0;
217 out.write(kvBuffer, pos, tsTypeValLen);
218 if (tagsLength > 0) {
219 if (compression.tagCompressionContext != null) {
220
221 compression.tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(),
222 tagsLength);
223 } else {
224
225
226 out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
227 }
228 }
229 }
230
231 private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
232 short dictIdx = Dictionary.NOT_IN_DICTIONARY;
233 if (dict != null) {
234 dictIdx = dict.findEntry(data, offset, length);
235 }
236 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
237 out.write(Dictionary.NOT_IN_DICTIONARY);
238 StreamUtils.writeRawVInt32(out, length);
239 out.write(data, offset, length);
240 } else {
241 StreamUtils.writeShort(out, dictIdx);
242 }
243 }
244 }
245
246 static class CompressedKvDecoder extends BaseDecoder {
247 private final CompressionContext compression;
248 public CompressedKvDecoder(InputStream in, CompressionContext compression) {
249 super(in);
250 this.compression = compression;
251 }
252
253 @Override
254 protected Cell parseCell() throws IOException {
255 int keylength = StreamUtils.readRawVarint32(in);
256 int vlength = StreamUtils.readRawVarint32(in);
257
258 int tagsLength = StreamUtils.readRawVarint32(in);
259 int length = 0;
260 if(tagsLength == 0) {
261 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
262 } else {
263 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
264 }
265
266 byte[] backingArray = new byte[length];
267 int pos = 0;
268 pos = Bytes.putInt(backingArray, pos, keylength);
269 pos = Bytes.putInt(backingArray, pos, vlength);
270
271
272 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict);
273 checkLength(elemLen, Short.MAX_VALUE);
274 pos = Bytes.putShort(backingArray, pos, (short)elemLen);
275 pos += elemLen;
276
277
278 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict);
279 checkLength(elemLen, Byte.MAX_VALUE);
280 pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
281 pos += elemLen;
282
283
284 elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
285 pos += elemLen;
286
287
288 int tsTypeValLen = length - pos;
289 if (tagsLength > 0) {
290 tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
291 }
292 IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
293 pos += tsTypeValLen;
294
295
296 if (tagsLength > 0) {
297 pos = Bytes.putAsShort(backingArray, pos, tagsLength);
298 if (compression.tagCompressionContext != null) {
299 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
300 } else {
301 IOUtils.readFully(in, backingArray, pos, tagsLength);
302 }
303 }
304 return new KeyValue(backingArray, 0, length);
305 }
306
307 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
308 byte status = (byte)in.read();
309 if (status == Dictionary.NOT_IN_DICTIONARY) {
310
311
312 int length = StreamUtils.readRawVarint32(in);
313 IOUtils.readFully(in, to, offset, length);
314 dict.addEntry(to, offset, length);
315 return length;
316 } else {
317
318 short dictIdx = StreamUtils.toShort(status, (byte)in.read());
319 byte[] entry = dict.getEntry(dictIdx);
320 if (entry == null) {
321 throw new IOException("Missing dictionary entry for index " + dictIdx);
322 }
323
324 Bytes.putBytes(to, offset, entry, 0, entry.length);
325 return entry.length;
326 }
327 }
328
329 private static void checkLength(int len, int max) throws IOException {
330 if (len < 0 || len > max) {
331 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
332 }
333 }
334 }
335
336 public class EnsureKvEncoder extends BaseEncoder {
337 public EnsureKvEncoder(OutputStream out) {
338 super(out);
339 }
340 @Override
341 public void write(Cell cell) throws IOException {
342 if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
343 checkFlushed();
344
345 KeyValue.oswrite((KeyValue) cell, this.out, true);
346 }
347 }
348
349 @Override
350 public Decoder getDecoder(InputStream is) {
351 return (compression == null)
352 ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
353 }
354
355 @Override
356 public Encoder getEncoder(OutputStream os) {
357 return (compression == null)
358 ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
359 }
360
361 public ByteStringCompressor getByteStringCompressor() {
362
363 return new BaosAndCompressor();
364 }
365
366 public ByteStringUncompressor getByteStringUncompressor() {
367
368 return this.statelessUncompressor;
369 }
370 }