1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FSDataOutputStream;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValue.KVComparator;
37 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
38 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
39 import org.apache.hadoop.hbase.util.BloomFilterWriter;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.io.Writable;
42 import org.apache.hadoop.io.WritableUtils;
43
44
45
46
47 @InterfaceAudience.Private
48 public class HFileWriterV2 extends AbstractHFileWriter {
49 static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
50
51
52 public static final byte [] MAX_MEMSTORE_TS_KEY =
53 Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
54
55
56 public static final byte [] KEY_VALUE_VERSION =
57 Bytes.toBytes("KEY_VALUE_VERSION");
58
59
60 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
61
62
63 private List<InlineBlockWriter> inlineBlockWriters =
64 new ArrayList<InlineBlockWriter>();
65
66
67 protected HFileBlock.Writer fsBlockWriter;
68
69 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
70 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
71
72
73 private long firstDataBlockOffset = -1;
74
75
76 protected long lastDataBlockOffset;
77
78
79 private byte[] lastKeyOfPreviousBlock = null;
80
81
82 private List<BlockWritable> additionalLoadOnOpenData =
83 new ArrayList<BlockWritable>();
84
85 protected long maxMemstoreTS = 0;
86
87 static class WriterFactoryV2 extends HFile.WriterFactory {
88 WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
89 super(conf, cacheConf);
90 }
91
92 @Override
93 public Writer createWriter(FileSystem fs, Path path,
94 FSDataOutputStream ostream,
95 KVComparator comparator, HFileContext context) throws IOException {
96 context.setIncludesTags(false);
97 return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
98 comparator, context);
99 }
100 }
101
102
103 public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
104 FileSystem fs, Path path, FSDataOutputStream ostream,
105 final KVComparator comparator, final HFileContext context) throws IOException {
106 super(cacheConf,
107 ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
108 path, comparator, context);
109 finishInit(conf);
110 }
111
112
113 protected void finishInit(final Configuration conf) {
114 if (fsBlockWriter != null)
115 throw new IllegalStateException("finishInit called twice");
116
117 fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
118
119
120 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
121 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
122 cacheIndexesOnWrite ? cacheConf : null,
123 cacheIndexesOnWrite ? name : null);
124 dataBlockIndexWriter.setMaxChunkSize(
125 HFileBlockIndex.getMaxChunkSize(conf));
126 inlineBlockWriters.add(dataBlockIndexWriter);
127
128
129 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
130 if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
131 }
132
133
134
135
136
137
138 protected void checkBlockBoundary() throws IOException {
139 if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
140 return;
141
142 finishBlock();
143 writeInlineBlocks(false);
144 newBlock();
145 }
146
147
148 private void finishBlock() throws IOException {
149 if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
150 return;
151
152
153 if (firstDataBlockOffset == -1) {
154 firstDataBlockOffset = outputStream.getPos();
155 }
156
157 lastDataBlockOffset = outputStream.getPos();
158 fsBlockWriter.writeHeaderAndData(outputStream);
159 int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
160
161 byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
162 dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);
163 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
164 if (cacheConf.shouldCacheDataOnWrite()) {
165 doCacheOnWrite(lastDataBlockOffset);
166 }
167 }
168
169
170 private void writeInlineBlocks(boolean closing) throws IOException {
171 for (InlineBlockWriter ibw : inlineBlockWriters) {
172 while (ibw.shouldWriteBlock(closing)) {
173 long offset = outputStream.getPos();
174 boolean cacheThisBlock = ibw.getCacheOnWrite();
175 ibw.writeInlineBlock(fsBlockWriter.startWriting(
176 ibw.getInlineBlockType()));
177 fsBlockWriter.writeHeaderAndData(outputStream);
178 ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
179 fsBlockWriter.getUncompressedSizeWithoutHeader());
180 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
181
182 if (cacheThisBlock) {
183 doCacheOnWrite(offset);
184 }
185 }
186 }
187 }
188
189
190
191
192
193
194 private void doCacheOnWrite(long offset) {
195 HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
196 cacheConf.getBlockCache().cacheBlock(
197 new BlockCacheKey(name, offset, blockEncoder.getDataBlockEncoding(),
198 cacheFormatBlock.getBlockType()), cacheFormatBlock);
199 }
200
201
202
203
204
205
206 protected void newBlock() throws IOException {
207
208 fsBlockWriter.startWriting(BlockType.DATA);
209 firstKeyInBlock = null;
210 if (lastKeyLength > 0) {
211 lastKeyOfPreviousBlock = new byte[lastKeyLength];
212 System.arraycopy(lastKeyBuffer, lastKeyOffset, lastKeyOfPreviousBlock, 0, lastKeyLength);
213 }
214 }
215
216
217
218
219
220
221
222
223
224
225
226
227 @Override
228 public void appendMetaBlock(String metaBlockName, Writable content) {
229 byte[] key = Bytes.toBytes(metaBlockName);
230 int i;
231 for (i = 0; i < metaNames.size(); ++i) {
232
233 byte[] cur = metaNames.get(i);
234 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
235 key.length) > 0) {
236 break;
237 }
238 }
239 metaNames.add(i, key);
240 metaData.add(i, content);
241 }
242
243
244
245
246
247
248
249
250
251 @Override
252 public void append(final KeyValue kv) throws IOException {
253 append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
254 kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
255 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
256 }
257
258
259
260
261
262
263
264
265
266
267
268 @Override
269 public void append(final byte[] key, final byte[] value) throws IOException {
270 append(0, key, 0, key.length, value, 0, value.length);
271 }
272
273
274
275
276
277
278
279
280
281
282
283
284
285 protected void append(final long memstoreTS, final byte[] key, final int koffset,
286 final int klength, final byte[] value, final int voffset, final int vlength)
287 throws IOException {
288 boolean dupKey = checkKey(key, koffset, klength);
289 checkValue(value, voffset, vlength);
290 if (!dupKey) {
291 checkBlockBoundary();
292 }
293
294 if (!fsBlockWriter.isWriting())
295 newBlock();
296
297
298
299 {
300 DataOutputStream out = fsBlockWriter.getUserDataStream();
301 out.writeInt(klength);
302 totalKeyLength += klength;
303 out.writeInt(vlength);
304 totalValueLength += vlength;
305 out.write(key, koffset, klength);
306 out.write(value, voffset, vlength);
307 if (this.hFileContext.isIncludesMvcc()) {
308 WritableUtils.writeVLong(out, memstoreTS);
309 }
310 }
311
312
313 if (firstKeyInBlock == null) {
314
315 firstKeyInBlock = new byte[klength];
316 System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
317 }
318
319 lastKeyBuffer = key;
320 lastKeyOffset = koffset;
321 lastKeyLength = klength;
322 entryCount++;
323 }
324
325 @Override
326 public void close() throws IOException {
327 if (outputStream == null) {
328 return;
329 }
330
331 blockEncoder.saveMetadata(this);
332
333
334
335 finishBlock();
336 writeInlineBlocks(true);
337
338 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
339
340
341 if (!metaNames.isEmpty()) {
342 for (int i = 0; i < metaNames.size(); ++i) {
343
344 long offset = outputStream.getPos();
345
346 DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
347 metaData.get(i).write(dos);
348
349 fsBlockWriter.writeHeaderAndData(outputStream);
350 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
351
352
353 metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
354 fsBlockWriter.getOnDiskSizeWithHeader());
355 }
356 }
357
358
359
360
361
362
363
364
365
366
367 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
368 trailer.setLoadOnOpenOffset(rootIndexOffset);
369
370
371 metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
372 BlockType.ROOT_INDEX), "meta");
373 fsBlockWriter.writeHeaderAndData(outputStream);
374 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
375
376 if (this.hFileContext.isIncludesMvcc()) {
377 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
378 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
379 }
380
381
382 writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
383 fsBlockWriter.writeHeaderAndData(outputStream);
384 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
385
386
387 for (BlockWritable w : additionalLoadOnOpenData){
388 fsBlockWriter.writeBlock(w, outputStream);
389 totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
390 }
391
392
393 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
394 trailer.setUncompressedDataIndexSize(
395 dataBlockIndexWriter.getTotalUncompressedSize());
396 trailer.setFirstDataBlockOffset(firstDataBlockOffset);
397 trailer.setLastDataBlockOffset(lastDataBlockOffset);
398 trailer.setComparatorClass(comparator.getClass());
399 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
400
401
402 finishClose(trailer);
403
404 fsBlockWriter.release();
405 }
406
407 @Override
408 public void addInlineBlockWriter(InlineBlockWriter ibw) {
409 inlineBlockWriters.add(ibw);
410 }
411
412 @Override
413 public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
414 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
415 }
416
417 @Override
418 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
419 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
420 }
421
422 private void addBloomFilter(final BloomFilterWriter bfw,
423 final BlockType blockType) {
424 if (bfw.getKeyCount() <= 0)
425 return;
426
427 if (blockType != BlockType.GENERAL_BLOOM_META &&
428 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
429 throw new RuntimeException("Block Type: " + blockType.toString() +
430 "is not supported");
431 }
432 additionalLoadOnOpenData.add(new BlockWritable() {
433 @Override
434 public BlockType getBlockType() {
435 return blockType;
436 }
437
438 @Override
439 public void writeToBlock(DataOutput out) throws IOException {
440 bfw.getMetaWriter().write(out);
441 Writable dataWriter = bfw.getDataWriter();
442 if (dataWriter != null)
443 dataWriter.write(out);
444 }
445 });
446 }
447
448 @Override
449 public void append(byte[] key, byte[] value, byte[] tag) throws IOException {
450 throw new UnsupportedOperationException("KV tags are supported only from HFile V3");
451 }
452
453 protected int getMajorVersion() {
454 return 2;
455 }
456
457 protected int getMinorVersion() {
458 return HFileReaderV2.MAX_MINOR_VERSION;
459 }
460
461 @Override
462 public HFileContext getFileContext() {
463 return hFileContext;
464 }
465 }