1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.InetSocketAddress;
24 import java.security.Key;
25 import java.security.KeyException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.NavigableSet;
32 import java.util.Set;
33 import java.util.SortedSet;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CompletionService;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorCompletionService;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.ThreadPoolExecutor;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicLong;
43 import java.util.concurrent.locks.ReentrantReadWriteLock;
44
45 import org.apache.commons.logging.Log;
46 import org.apache.commons.logging.LogFactory;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.Cell;
51 import org.apache.hadoop.hbase.CellUtil;
52 import org.apache.hadoop.hbase.CompoundConfiguration;
53 import org.apache.hadoop.hbase.HColumnDescriptor;
54 import org.apache.hadoop.hbase.HConstants;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.KeyValue;
57 import org.apache.hadoop.hbase.RemoteExceptionHandler;
58 import org.apache.hadoop.hbase.TableName;
59 import org.apache.hadoop.hbase.Tag;
60 import org.apache.hadoop.hbase.TagType;
61 import org.apache.hadoop.hbase.classification.InterfaceAudience;
62 import org.apache.hadoop.hbase.client.Scan;
63 import org.apache.hadoop.hbase.io.compress.Compression;
64 import org.apache.hadoop.hbase.io.crypto.Cipher;
65 import org.apache.hadoop.hbase.io.crypto.Encryption;
66 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
67 import org.apache.hadoop.hbase.io.hfile.HFile;
68 import org.apache.hadoop.hbase.io.hfile.HFileContext;
69 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
70 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
71 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
72 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
73 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
74 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
75 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
76 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
77 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
78 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
79 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
80 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
81 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
82 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
83 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
84 import org.apache.hadoop.hbase.security.EncryptionUtil;
85 import org.apache.hadoop.hbase.security.User;
86 import org.apache.hadoop.hbase.util.Bytes;
87 import org.apache.hadoop.hbase.util.ChecksumType;
88 import org.apache.hadoop.hbase.util.ClassSize;
89 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
90 import org.apache.hadoop.util.StringUtils;
91
92 import com.google.common.annotations.VisibleForTesting;
93 import com.google.common.base.Preconditions;
94 import com.google.common.collect.ImmutableCollection;
95 import com.google.common.collect.ImmutableList;
96 import com.google.common.collect.Lists;
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 @InterfaceAudience.Private
122 public class HStore implements Store {
123 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
124 "hbase.server.compactchecker.interval.multiplier";
125 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
126 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
127 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
128
129 static final Log LOG = LogFactory.getLog(HStore.class);
130
131 protected final MemStore memstore;
132
133 private final HRegion region;
134 private final HColumnDescriptor family;
135 private final HRegionFileSystem fs;
136 private final Configuration conf;
137 private final CacheConfig cacheConf;
138 private long lastCompactSize = 0;
139 volatile boolean forceMajor = false;
140
141 static int closeCheckInterval = 0;
142 private volatile long storeSize = 0L;
143 private volatile long totalUncompressedBytes = 0L;
144
145
146
147
148
149
150
151
152
153
154 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
155 private final boolean verifyBulkLoads;
156
157 private ScanInfo scanInfo;
158
159
160 final List<StoreFile> filesCompacting = Lists.newArrayList();
161
162
163 private final Set<ChangedReadersObserver> changedReaderObservers =
164 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
165
166 private final int blocksize;
167 private HFileDataBlockEncoder dataBlockEncoder;
168
169
170 private ChecksumType checksumType;
171 private int bytesPerChecksum;
172
173
174 private final KeyValue.KVComparator comparator;
175
176 final StoreEngine<?, ?, ?, ?> storeEngine;
177
178 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
179 private final OffPeakHours offPeakHours;
180
181 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
182 private int flushRetriesNumber;
183 private int pauseTime;
184
185 private long blockingFileCount;
186 private int compactionCheckMultiplier;
187
188 private Encryption.Context cryptoContext = Encryption.Context.NONE;
189
190 private volatile long flushedCellsCount = 0;
191 private volatile long compactedCellsCount = 0;
192 private volatile long majorCompactedCellsCount = 0;
193 private volatile long flushedCellsSize = 0;
194 private volatile long compactedCellsSize = 0;
195 private volatile long majorCompactedCellsSize = 0;
196
197
198
199
200
201
202
203
204
205 protected HStore(final HRegion region, final HColumnDescriptor family,
206 final Configuration confParam) throws IOException {
207
208 HRegionInfo info = region.getRegionInfo();
209 this.fs = region.getRegionFileSystem();
210
211
212 fs.createStoreDir(family.getNameAsString());
213 this.region = region;
214 this.family = family;
215
216
217
218 this.conf = new CompoundConfiguration()
219 .add(confParam)
220 .addStringMap(region.getTableDesc().getConfiguration())
221 .addStringMap(family.getConfiguration())
222 .addWritableMap(family.getValues());
223 this.blocksize = family.getBlocksize();
224
225 this.dataBlockEncoder =
226 new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
227
228 this.comparator = info.getComparator();
229
230 long timeToPurgeDeletes =
231 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
232 LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
233 "ms in store " + this);
234
235 long ttl = determineTTLFromFamily(family);
236
237
238 scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
239 this.memstore = new MemStore(conf, this.comparator);
240 this.offPeakHours = OffPeakHours.getInstance(conf);
241
242
243 this.cacheConf = new CacheConfig(conf, family);
244
245 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
246
247 this.blockingFileCount =
248 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
249 this.compactionCheckMultiplier = conf.getInt(
250 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
251 if (this.compactionCheckMultiplier <= 0) {
252 LOG.error("Compaction check period multiplier must be positive, setting default: "
253 + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
254 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
255 }
256
257 if (HStore.closeCheckInterval == 0) {
258 HStore.closeCheckInterval = conf.getInt(
259 "hbase.hstore.close.check.interval", 10*1000*1000
260 }
261
262 this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
263 this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
264
265
266 this.checksumType = getChecksumType(conf);
267
268 this.bytesPerChecksum = getBytesPerChecksum(conf);
269 flushRetriesNumber = conf.getInt(
270 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
271 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
272 if (flushRetriesNumber <= 0) {
273 throw new IllegalArgumentException(
274 "hbase.hstore.flush.retries.number must be > 0, not "
275 + flushRetriesNumber);
276 }
277
278
279 String cipherName = family.getEncryptionType();
280 if (cipherName != null) {
281 Cipher cipher;
282 Key key;
283 byte[] keyBytes = family.getEncryptionKey();
284 if (keyBytes != null) {
285
286 String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
287 User.getCurrent().getShortName());
288 try {
289
290 key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
291 } catch (KeyException e) {
292
293
294 if (LOG.isDebugEnabled()) {
295 LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
296 }
297 String alternateKeyName =
298 conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
299 if (alternateKeyName != null) {
300 try {
301 key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
302 } catch (KeyException ex) {
303 throw new IOException(ex);
304 }
305 } else {
306 throw new IOException(e);
307 }
308 }
309
310 cipher = Encryption.getCipher(conf, key.getAlgorithm());
311 if (cipher == null) {
312 throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
313 }
314
315
316
317 if (!cipher.getName().equalsIgnoreCase(cipherName)) {
318 throw new RuntimeException("Encryption for family '" + family.getNameAsString() +
319 "' configured with type '" + cipherName +
320 "' but key specifies algorithm '" + cipher.getName() + "'");
321 }
322 } else {
323
324 cipher = Encryption.getCipher(conf, cipherName);
325 if (cipher == null) {
326 throw new RuntimeException("Cipher '" + cipherName + "' is not available");
327 }
328 key = cipher.getRandomKey();
329 }
330 cryptoContext = Encryption.newContext(conf);
331 cryptoContext.setCipher(cipher);
332 cryptoContext.setKey(key);
333 }
334 }
335
336
337
338
339
340 private static long determineTTLFromFamily(final HColumnDescriptor family) {
341
342 long ttl = family.getTimeToLive();
343 if (ttl == HConstants.FOREVER) {
344
345 ttl = Long.MAX_VALUE;
346 } else if (ttl == -1) {
347 ttl = Long.MAX_VALUE;
348 } else {
349
350 ttl *= 1000;
351 }
352 return ttl;
353 }
354
355 @Override
356 public String getColumnFamilyName() {
357 return this.family.getNameAsString();
358 }
359
360 @Override
361 public TableName getTableName() {
362 return this.getRegionInfo().getTable();
363 }
364
365 @Override
366 public FileSystem getFileSystem() {
367 return this.fs.getFileSystem();
368 }
369
370 public HRegionFileSystem getRegionFileSystem() {
371 return this.fs;
372 }
373
374
375 @Override
376 public long getStoreFileTtl() {
377
378 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
379 }
380
381 @Override
382 public long getMemstoreFlushSize() {
383
384 return this.region.memstoreFlushSize;
385 }
386
387 @Override
388 public long getFlushableSize() {
389 return this.memstore.getFlushableSize();
390 }
391
392 @Override
393 public long getCompactionCheckMultiplier() {
394 return this.compactionCheckMultiplier;
395 }
396
397 @Override
398 public long getBlockingFileCount() {
399 return blockingFileCount;
400 }
401
402
403
404
405
406
407
408 public static int getBytesPerChecksum(Configuration conf) {
409 return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
410 HFile.DEFAULT_BYTES_PER_CHECKSUM);
411 }
412
413
414
415
416
417
418 public static ChecksumType getChecksumType(Configuration conf) {
419 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
420 if (checksumName == null) {
421 return HFile.DEFAULT_CHECKSUM_TYPE;
422 } else {
423 return ChecksumType.nameToType(checksumName);
424 }
425 }
426
427
428
429
430 public static int getCloseCheckInterval() {
431 return closeCheckInterval;
432 }
433
434 @Override
435 public HColumnDescriptor getFamily() {
436 return this.family;
437 }
438
439
440
441
442 long getMaxSequenceId() {
443 return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
444 }
445
446 @Override
447 public long getMaxMemstoreTS() {
448 return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
449 }
450
451
452
453
454
455
456
457 @Deprecated
458 public static Path getStoreHomedir(final Path tabledir,
459 final HRegionInfo hri, final byte[] family) {
460 return getStoreHomedir(tabledir, hri.getEncodedName(), family);
461 }
462
463
464
465
466
467
468
469 @Deprecated
470 public static Path getStoreHomedir(final Path tabledir,
471 final String encodedName, final byte[] family) {
472 return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
473 }
474
475 @Override
476 public HFileDataBlockEncoder getDataBlockEncoder() {
477 return dataBlockEncoder;
478 }
479
480
481
482
483
484 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
485 this.dataBlockEncoder = blockEncoder;
486 }
487
488
489
490
491
492
493 private List<StoreFile> loadStoreFiles() throws IOException {
494 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
495 if (files == null || files.size() == 0) {
496 return new ArrayList<StoreFile>();
497 }
498
499
500 ThreadPoolExecutor storeFileOpenerThreadPool =
501 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
502 this.getColumnFamilyName());
503 CompletionService<StoreFile> completionService =
504 new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
505
506 int totalValidStoreFile = 0;
507 for (final StoreFileInfo storeFileInfo: files) {
508
509 completionService.submit(new Callable<StoreFile>() {
510 @Override
511 public StoreFile call() throws IOException {
512 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
513 return storeFile;
514 }
515 });
516 totalValidStoreFile++;
517 }
518
519 ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
520 IOException ioe = null;
521 try {
522 for (int i = 0; i < totalValidStoreFile; i++) {
523 try {
524 Future<StoreFile> future = completionService.take();
525 StoreFile storeFile = future.get();
526 long length = storeFile.getReader().length();
527 this.storeSize += length;
528 this.totalUncompressedBytes +=
529 storeFile.getReader().getTotalUncompressedBytes();
530 if (LOG.isDebugEnabled()) {
531 LOG.debug("loaded " + storeFile.toStringDetailed());
532 }
533 results.add(storeFile);
534 } catch (InterruptedException e) {
535 if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
536 } catch (ExecutionException e) {
537 if (ioe == null) ioe = new IOException(e.getCause());
538 }
539 }
540 } finally {
541 storeFileOpenerThreadPool.shutdownNow();
542 }
543 if (ioe != null) {
544
545 for (StoreFile file : results) {
546 try {
547 if (file != null) file.closeReader(true);
548 } catch (IOException e) {
549 LOG.warn(e.getMessage());
550 }
551 }
552 throw ioe;
553 }
554
555 return results;
556 }
557
558 private StoreFile createStoreFileAndReader(final Path p) throws IOException {
559 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
560 return createStoreFileAndReader(info);
561 }
562
563 private StoreFile createStoreFileAndReader(final StoreFileInfo info)
564 throws IOException {
565 info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
566 StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
567 this.family.getBloomFilterType());
568 storeFile.createReader();
569 return storeFile;
570 }
571
572 @Override
573 public long add(final KeyValue kv) {
574 lock.readLock().lock();
575 try {
576 return this.memstore.add(kv);
577 } finally {
578 lock.readLock().unlock();
579 }
580 }
581
582 @Override
583 public long timeOfOldestEdit() {
584 return memstore.timeOfOldestEdit();
585 }
586
587
588
589
590
591
592
593 protected long delete(final KeyValue kv) {
594 lock.readLock().lock();
595 try {
596 return this.memstore.delete(kv);
597 } finally {
598 lock.readLock().unlock();
599 }
600 }
601
602 @Override
603 public void rollback(final KeyValue kv) {
604 lock.readLock().lock();
605 try {
606 this.memstore.rollback(kv);
607 } finally {
608 lock.readLock().unlock();
609 }
610 }
611
612
613
614
615 @Override
616 public Collection<StoreFile> getStorefiles() {
617 return this.storeEngine.getStoreFileManager().getStorefiles();
618 }
619
620 @Override
621 public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
622 HFile.Reader reader = null;
623 try {
624 LOG.info("Validating hfile at " + srcPath + " for inclusion in "
625 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
626 reader = HFile.createReader(srcPath.getFileSystem(conf),
627 srcPath, cacheConf, conf);
628 reader.loadFileInfo();
629
630 byte[] firstKey = reader.getFirstRowKey();
631 Preconditions.checkState(firstKey != null, "First key can not be null");
632 byte[] lk = reader.getLastKey();
633 Preconditions.checkState(lk != null, "Last key can not be null");
634 byte[] lastKey = KeyValue.createKeyValueFromKey(lk).getRow();
635
636 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
637 " last=" + Bytes.toStringBinary(lastKey));
638 LOG.debug("Region bounds: first=" +
639 Bytes.toStringBinary(getRegionInfo().getStartKey()) +
640 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
641
642 if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
643 throw new WrongRegionException(
644 "Bulk load file " + srcPath.toString() + " does not fit inside region "
645 + this.getRegionInfo().getRegionNameAsString());
646 }
647
648 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
649 HConstants.DEFAULT_MAX_FILE_SIZE)) {
650 LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
651 reader.length() + " bytes can be problematic as it may lead to oversplitting.");
652 }
653
654 if (verifyBulkLoads) {
655 long verificationStartTime = EnvironmentEdgeManager.currentTimeMillis();
656 LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
657 KeyValue prevKV = null;
658 HFileScanner scanner = reader.getScanner(false, false, false);
659 scanner.seekTo();
660 do {
661 KeyValue kv = scanner.getKeyValue();
662 if (prevKV != null) {
663 if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getRowOffset(),
664 prevKV.getRowLength(), kv.getBuffer(), kv.getRowOffset(),
665 kv.getRowLength()) > 0) {
666 throw new InvalidHFileException("Previous row is greater than"
667 + " current row: path=" + srcPath + " previous="
668 + Bytes.toStringBinary(prevKV.getKey()) + " current="
669 + Bytes.toStringBinary(kv.getKey()));
670 }
671 if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getFamilyOffset(),
672 prevKV.getFamilyLength(), kv.getBuffer(), kv.getFamilyOffset(),
673 kv.getFamilyLength()) != 0) {
674 throw new InvalidHFileException("Previous key had different"
675 + " family compared to current key: path=" + srcPath
676 + " previous=" + Bytes.toStringBinary(prevKV.getFamily())
677 + " current=" + Bytes.toStringBinary(kv.getFamily()));
678 }
679 }
680 prevKV = kv;
681 } while (scanner.next());
682 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
683 + " took " + (EnvironmentEdgeManager.currentTimeMillis() - verificationStartTime)
684 + " ms");
685 }
686 } finally {
687 if (reader != null) reader.close();
688 }
689 }
690
691 @Override
692 public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
693 Path srcPath = new Path(srcPathStr);
694 Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
695
696 StoreFile sf = createStoreFileAndReader(dstPath);
697
698 StoreFile.Reader r = sf.getReader();
699 this.storeSize += r.length();
700 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
701
702 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() +
703 "' as " + dstPath + " - updating store file list.");
704
705
706 this.lock.writeLock().lock();
707 try {
708 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
709 } finally {
710
711
712
713
714
715 this.lock.writeLock().unlock();
716 }
717 notifyChangedReadersObservers();
718 LOG.info("Successfully loaded store file " + srcPath
719 + " into store " + this + " (new location: " + dstPath + ")");
720 if (LOG.isTraceEnabled()) {
721 String traceMessage = "BULK LOAD time,size,store size,store files ["
722 + EnvironmentEdgeManager.currentTimeMillis() + "," + r.length() + "," + storeSize
723 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
724 LOG.trace(traceMessage);
725 }
726 }
727
728 @Override
729 public ImmutableCollection<StoreFile> close() throws IOException {
730 this.lock.writeLock().lock();
731 try {
732
733 ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
734
735 if (!result.isEmpty()) {
736
737 ThreadPoolExecutor storeFileCloserThreadPool = this.region
738 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
739 + this.getColumnFamilyName());
740
741
742 CompletionService<Void> completionService =
743 new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
744 for (final StoreFile f : result) {
745 completionService.submit(new Callable<Void>() {
746 @Override
747 public Void call() throws IOException {
748 f.closeReader(true);
749 return null;
750 }
751 });
752 }
753
754 IOException ioe = null;
755 try {
756 for (int i = 0; i < result.size(); i++) {
757 try {
758 Future<Void> future = completionService.take();
759 future.get();
760 } catch (InterruptedException e) {
761 if (ioe == null) {
762 ioe = new InterruptedIOException();
763 ioe.initCause(e);
764 }
765 } catch (ExecutionException e) {
766 if (ioe == null) ioe = new IOException(e.getCause());
767 }
768 }
769 } finally {
770 storeFileCloserThreadPool.shutdownNow();
771 }
772 if (ioe != null) throw ioe;
773 }
774 LOG.info("Closed " + this);
775 return result;
776 } finally {
777 this.lock.writeLock().unlock();
778 }
779 }
780
781
782
783
784
785
786 void snapshot() {
787 this.lock.writeLock().lock();
788 try {
789 this.memstore.snapshot();
790 } finally {
791 this.lock.writeLock().unlock();
792 }
793 }
794
795
796
797
798
799
800
801
802
803
804
805
806 protected List<Path> flushCache(final long logCacheFlushId,
807 SortedSet<KeyValue> snapshot,
808 TimeRangeTracker snapshotTimeRangeTracker,
809 AtomicLong flushedSize,
810 MonitoredTask status) throws IOException {
811
812
813
814
815
816 StoreFlusher flusher = storeEngine.getStoreFlusher();
817 IOException lastException = null;
818 for (int i = 0; i < flushRetriesNumber; i++) {
819 try {
820 List<Path> pathNames = flusher.flushSnapshot(
821 snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
822 Path lastPathName = null;
823 try {
824 for (Path pathName : pathNames) {
825 lastPathName = pathName;
826 validateStoreFile(pathName);
827 }
828 return pathNames;
829 } catch (Exception e) {
830 LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
831 if (e instanceof IOException) {
832 lastException = (IOException) e;
833 } else {
834 lastException = new IOException(e);
835 }
836 }
837 } catch (IOException e) {
838 LOG.warn("Failed flushing store file, retrying num=" + i, e);
839 lastException = e;
840 }
841 if (lastException != null && i < (flushRetriesNumber - 1)) {
842 try {
843 Thread.sleep(pauseTime);
844 } catch (InterruptedException e) {
845 IOException iie = new InterruptedIOException();
846 iie.initCause(e);
847 throw iie;
848 }
849 }
850 }
851 throw lastException;
852 }
853
854
855
856
857
858
859
860 private StoreFile commitFile(final Path path,
861 final long logCacheFlushId,
862 TimeRangeTracker snapshotTimeRangeTracker,
863 AtomicLong flushedSize,
864 MonitoredTask status)
865 throws IOException {
866
867 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
868
869 status.setStatus("Flushing " + this + ": reopening flushed file");
870 StoreFile sf = createStoreFileAndReader(dstPath);
871
872 StoreFile.Reader r = sf.getReader();
873 this.storeSize += r.length();
874 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
875
876 if (LOG.isInfoEnabled()) {
877 LOG.info("Added " + sf + ", entries=" + r.getEntries() +
878 ", sequenceid=" + logCacheFlushId +
879 ", filesize=" + StringUtils.humanReadableInt(r.length()));
880 }
881 return sf;
882 }
883
884
885
886
887
888
889
890
891
892 @Override
893 public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
894 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
895 throws IOException {
896 final CacheConfig writerCacheConf;
897 if (isCompaction) {
898
899 writerCacheConf = new CacheConfig(cacheConf);
900 writerCacheConf.setCacheDataOnWrite(false);
901 } else {
902 writerCacheConf = cacheConf;
903 }
904 InetSocketAddress[] favoredNodes = null;
905 if (region.getRegionServerServices() != null) {
906 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
907 region.getRegionInfo().getEncodedName());
908 }
909 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
910 cryptoContext);
911 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
912 this.getFileSystem())
913 .withFilePath(fs.createTempName())
914 .withComparator(comparator)
915 .withBloomType(family.getBloomFilterType())
916 .withMaxKeyCount(maxKeyCount)
917 .withFavoredNodes(favoredNodes)
918 .withFileContext(hFileContext)
919 .build();
920 return w;
921 }
922
923 private HFileContext createFileContext(Compression.Algorithm compression,
924 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
925 if (compression == null) {
926 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
927 }
928 HFileContext hFileContext = new HFileContextBuilder()
929 .withIncludesMvcc(includeMVCCReadpoint)
930 .withIncludesTags(includesTag)
931 .withCompression(compression)
932 .withCompressTags(family.shouldCompressTags())
933 .withChecksumType(checksumType)
934 .withBytesPerCheckSum(bytesPerChecksum)
935 .withBlockSize(blocksize)
936 .withHBaseCheckSum(true)
937 .withDataBlockEncoding(family.getDataBlockEncoding())
938 .withEncryptionContext(cryptoContext)
939 .build();
940 return hFileContext;
941 }
942
943
944
945
946
947
948
949
950
951 private boolean updateStorefiles(
952 final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
953 this.lock.writeLock().lock();
954 try {
955 this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
956 this.memstore.clearSnapshot(set);
957 } finally {
958
959
960
961
962
963 this.lock.writeLock().unlock();
964 }
965
966
967 notifyChangedReadersObservers();
968
969 if (LOG.isTraceEnabled()) {
970 long totalSize = 0;
971 for (StoreFile sf : sfs) {
972 totalSize += sf.getReader().length();
973 }
974 String traceMessage = "FLUSH time,count,size,store size,store files ["
975 + EnvironmentEdgeManager.currentTimeMillis() + "," + sfs.size() + "," + totalSize
976 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
977 LOG.trace(traceMessage);
978 }
979 return needsCompaction();
980 }
981
982
983
984
985
986 private void notifyChangedReadersObservers() throws IOException {
987 for (ChangedReadersObserver o: this.changedReaderObservers) {
988 o.updateReaders();
989 }
990 }
991
992
993
994
995
996
997 @Override
998 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
999 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1000 byte[] stopRow, long readPt) throws IOException {
1001 Collection<StoreFile> storeFilesToScan;
1002 List<KeyValueScanner> memStoreScanners;
1003 this.lock.readLock().lock();
1004 try {
1005 storeFilesToScan =
1006 this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1007 memStoreScanners = this.memstore.getScanners(readPt);
1008 } finally {
1009 this.lock.readLock().unlock();
1010 }
1011
1012
1013
1014
1015
1016
1017 List<StoreFileScanner> sfScanners = StoreFileScanner
1018 .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
1019 readPt);
1020 List<KeyValueScanner> scanners =
1021 new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1022 scanners.addAll(sfScanners);
1023
1024 scanners.addAll(memStoreScanners);
1025 return scanners;
1026 }
1027
1028 @Override
1029 public void addChangedReaderObserver(ChangedReadersObserver o) {
1030 this.changedReaderObservers.add(o);
1031 }
1032
1033 @Override
1034 public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1035
1036 this.changedReaderObservers.remove(o);
1037 }
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086 @Override
1087 public List<StoreFile> compact(CompactionContext compaction,
1088 CompactionThroughputController throughputController) throws IOException {
1089 assert compaction != null;
1090 List<StoreFile> sfs = null;
1091 CompactionRequest cr = compaction.getRequest();;
1092 try {
1093
1094
1095
1096 long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
1097 assert compaction.hasSelection();
1098 Collection<StoreFile> filesToCompact = cr.getFiles();
1099 assert !filesToCompact.isEmpty();
1100 synchronized (filesCompacting) {
1101
1102
1103 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1104 }
1105
1106
1107 LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1108 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1109 + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1110 + StringUtils.humanReadableInt(cr.getSize()));
1111
1112
1113 List<Path> newFiles = compaction.compact(throughputController);
1114
1115
1116 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1117 LOG.warn("hbase.hstore.compaction.complete is set to false");
1118 sfs = new ArrayList<StoreFile>(newFiles.size());
1119 for (Path newFile : newFiles) {
1120
1121 StoreFile sf = createStoreFileAndReader(newFile);
1122 sf.closeReader(true);
1123 sfs.add(sf);
1124 }
1125 return sfs;
1126 }
1127
1128 sfs = moveCompatedFilesIntoPlace(cr, newFiles);
1129 writeCompactionWalRecord(filesToCompact, sfs);
1130 replaceStoreFiles(filesToCompact, sfs);
1131 if (cr.isMajor()) {
1132 majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1133 majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1134 } else {
1135 compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1136 compactedCellsSize += getCompactionProgress().totalCompactedSize;
1137 }
1138
1139 completeCompaction(filesToCompact);
1140
1141 logCompactionEndMessage(cr, sfs, compactionStartTime);
1142 return sfs;
1143 } finally {
1144 finishCompactionRequest(cr);
1145 }
1146 }
1147
1148 private List<StoreFile> moveCompatedFilesIntoPlace(
1149 CompactionRequest cr, List<Path> newFiles) throws IOException {
1150 List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1151 for (Path newFile : newFiles) {
1152 assert newFile != null;
1153 StoreFile sf = moveFileIntoPlace(newFile);
1154 if (this.getCoprocessorHost() != null) {
1155 this.getCoprocessorHost().postCompact(this, sf, cr);
1156 }
1157 assert sf != null;
1158 sfs.add(sf);
1159 }
1160 return sfs;
1161 }
1162
1163
1164 StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1165 validateStoreFile(newFile);
1166
1167 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1168 return createStoreFileAndReader(destPath);
1169 }
1170
1171
1172
1173
1174
1175
1176 private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1177 Collection<StoreFile> newFiles) throws IOException {
1178 if (region.getLog() == null) return;
1179 List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1180 for (StoreFile f : filesCompacted) {
1181 inputPaths.add(f.getPath());
1182 }
1183 List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1184 for (StoreFile f : newFiles) {
1185 outputPaths.add(f.getPath());
1186 }
1187 HRegionInfo info = this.region.getRegionInfo();
1188 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1189 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1190 HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
1191 this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
1192 }
1193
1194 private void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1195 final Collection<StoreFile> result) throws IOException {
1196 this.lock.writeLock().lock();
1197 try {
1198 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1199 filesCompacting.removeAll(compactedFiles);
1200 } finally {
1201 this.lock.writeLock().unlock();
1202 }
1203 }
1204
1205
1206
1207
1208
1209
1210
1211 private void logCompactionEndMessage(
1212 CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
1213 long now = EnvironmentEdgeManager.currentTimeMillis();
1214 StringBuilder message = new StringBuilder(
1215 "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
1216 + cr.getFiles().size() + " file(s) in " + this + " of "
1217 + this.getRegionInfo().getRegionNameAsString()
1218 + " into ");
1219 if (sfs.isEmpty()) {
1220 message.append("none, ");
1221 } else {
1222 for (StoreFile sf: sfs) {
1223 message.append(sf.getPath().getName());
1224 message.append("(size=");
1225 message.append(StringUtils.humanReadableInt(sf.getReader().length()));
1226 message.append("), ");
1227 }
1228 }
1229 message.append("total size for store is ")
1230 .append(StringUtils.humanReadableInt(storeSize))
1231 .append(". This selection was in queue for ")
1232 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1233 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1234 .append(" to execute.");
1235 LOG.info(message.toString());
1236 if (LOG.isTraceEnabled()) {
1237 int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1238 long resultSize = 0;
1239 for (StoreFile sf : sfs) {
1240 resultSize += sf.getReader().length();
1241 }
1242 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1243 + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1244 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1245 LOG.trace(traceMessage);
1246 }
1247 }
1248
1249
1250
1251
1252
1253
1254
1255 @Override
1256 public void completeCompactionMarker(CompactionDescriptor compaction)
1257 throws IOException {
1258 LOG.debug("Completing compaction from the WAL marker");
1259 List<String> compactionInputs = compaction.getCompactionInputList();
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275 String familyName = this.getColumnFamilyName();
1276 List<Path> inputPaths = new ArrayList<Path>(compactionInputs.size());
1277 for (String compactionInput : compactionInputs) {
1278 Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1279 inputPaths.add(inputPath);
1280 }
1281
1282
1283 List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1284 for (StoreFile sf : this.getStorefiles()) {
1285 if (inputPaths.contains(sf.getQualifiedPath())) {
1286 inputStoreFiles.add(sf);
1287 }
1288 }
1289
1290 this.replaceStoreFiles(inputStoreFiles, Collections.<StoreFile>emptyList());
1291 this.completeCompaction(inputStoreFiles);
1292 }
1293
1294
1295
1296
1297
1298
1299
1300
1301 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1302 List<StoreFile> filesToCompact;
1303 boolean isMajor;
1304
1305 this.lock.readLock().lock();
1306 try {
1307 synchronized (filesCompacting) {
1308 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1309 if (!filesCompacting.isEmpty()) {
1310
1311
1312 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1313 int idx = filesToCompact.indexOf(last);
1314 Preconditions.checkArgument(idx != -1);
1315 filesToCompact.subList(0, idx + 1).clear();
1316 }
1317 int count = filesToCompact.size();
1318 if (N > count) {
1319 throw new RuntimeException("Not enough files");
1320 }
1321
1322 filesToCompact = filesToCompact.subList(count - N, count);
1323 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1324 filesCompacting.addAll(filesToCompact);
1325 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1326 }
1327 } finally {
1328 this.lock.readLock().unlock();
1329 }
1330
1331 try {
1332
1333 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1334 .compactForTesting(filesToCompact, isMajor);
1335 for (Path newFile: newFiles) {
1336
1337 StoreFile sf = moveFileIntoPlace(newFile);
1338 if (this.getCoprocessorHost() != null) {
1339 this.getCoprocessorHost().postCompact(this, sf, null);
1340 }
1341 replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1342 completeCompaction(filesToCompact);
1343 }
1344 } finally {
1345 synchronized (filesCompacting) {
1346 filesCompacting.removeAll(filesToCompact);
1347 }
1348 }
1349 }
1350
1351 @Override
1352 public boolean hasReferences() {
1353 return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1354 }
1355
1356 @Override
1357 public CompactionProgress getCompactionProgress() {
1358 return this.storeEngine.getCompactor().getProgress();
1359 }
1360
1361 @Override
1362 public boolean isMajorCompaction() throws IOException {
1363 for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1364
1365 if (sf.getReader() == null) {
1366 LOG.debug("StoreFile " + sf + " has null Reader");
1367 return false;
1368 }
1369 }
1370 return storeEngine.getCompactionPolicy().isMajorCompaction(
1371 this.storeEngine.getStoreFileManager().getStorefiles());
1372 }
1373
1374 @Override
1375 public CompactionContext requestCompaction() throws IOException {
1376 return requestCompaction(Store.NO_PRIORITY, null);
1377 }
1378
1379 @Override
1380 public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1381 throws IOException {
1382
1383 if (!this.areWritesEnabled()) {
1384 return null;
1385 }
1386
1387
1388 removeUnneededFiles();
1389
1390 CompactionContext compaction = storeEngine.createCompaction();
1391 this.lock.readLock().lock();
1392 try {
1393 synchronized (filesCompacting) {
1394
1395 if (this.getCoprocessorHost() != null) {
1396 List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1397 boolean override = this.getCoprocessorHost().preCompactSelection(
1398 this, candidatesForCoproc, baseRequest);
1399 if (override) {
1400
1401 compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1402 }
1403 }
1404
1405
1406 if (!compaction.hasSelection()) {
1407 boolean isUserCompaction = priority == Store.PRIORITY_USER;
1408 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1409 offPeakCompactionTracker.compareAndSet(false, true);
1410 try {
1411 compaction.select(this.filesCompacting, isUserCompaction,
1412 mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1413 } catch (IOException e) {
1414 if (mayUseOffPeak) {
1415 offPeakCompactionTracker.set(false);
1416 }
1417 throw e;
1418 }
1419 assert compaction.hasSelection();
1420 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1421
1422 offPeakCompactionTracker.set(false);
1423 }
1424 }
1425 if (this.getCoprocessorHost() != null) {
1426 this.getCoprocessorHost().postCompactSelection(
1427 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1428 }
1429
1430
1431 if (baseRequest != null) {
1432
1433
1434 compaction.forceSelect(
1435 baseRequest.combineWith(compaction.getRequest()));
1436 }
1437
1438
1439 final Collection<StoreFile> selectedFiles = compaction.getRequest().getFiles();
1440 if (selectedFiles.isEmpty()) {
1441 return null;
1442 }
1443
1444 addToCompactingFiles(selectedFiles);
1445
1446
1447 boolean isMajor = selectedFiles.size() == this.getStorefilesCount();
1448 this.forceMajor = this.forceMajor && !isMajor;
1449
1450
1451
1452 compaction.getRequest().setPriority(
1453 (priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1454 compaction.getRequest().setIsMajor(isMajor);
1455 compaction.getRequest().setDescription(
1456 getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1457 }
1458 } finally {
1459 this.lock.readLock().unlock();
1460 }
1461
1462 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating "
1463 + (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction");
1464 this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());
1465 return compaction;
1466 }
1467
1468
1469 private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1470 if (filesToAdd == null) return;
1471
1472 if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1473 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1474 }
1475 filesCompacting.addAll(filesToAdd);
1476 Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
1477 }
1478
1479 private void removeUnneededFiles() throws IOException {
1480 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1481 if (getFamily().getMinVersions() > 0) {
1482 LOG.debug("Skipping expired store file removal due to min version being " +
1483 getFamily().getMinVersions());
1484 return;
1485 }
1486 this.lock.readLock().lock();
1487 Collection<StoreFile> delSfs = null;
1488 try {
1489 synchronized (filesCompacting) {
1490 long cfTtl = getStoreFileTtl();
1491 if (cfTtl != Long.MAX_VALUE) {
1492 delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1493 EnvironmentEdgeManager.currentTimeMillis() - cfTtl, filesCompacting);
1494 addToCompactingFiles(delSfs);
1495 }
1496 }
1497 } finally {
1498 this.lock.readLock().unlock();
1499 }
1500 if (delSfs == null || delSfs.isEmpty()) return;
1501
1502 Collection<StoreFile> newFiles = new ArrayList<StoreFile>();
1503 writeCompactionWalRecord(delSfs, newFiles);
1504 replaceStoreFiles(delSfs, newFiles);
1505 completeCompaction(delSfs);
1506 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1507 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1508 + "; total size for store is " + StringUtils.humanReadableInt(storeSize));
1509 }
1510
1511 @Override
1512 public void cancelRequestedCompaction(CompactionContext compaction) {
1513 finishCompactionRequest(compaction.getRequest());
1514 }
1515
1516 private void finishCompactionRequest(CompactionRequest cr) {
1517 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1518 if (cr.isOffPeak()) {
1519 offPeakCompactionTracker.set(false);
1520 cr.setOffPeak(false);
1521 }
1522 synchronized (filesCompacting) {
1523 filesCompacting.removeAll(cr.getFiles());
1524 }
1525 }
1526
1527
1528
1529
1530
1531
1532
1533 private void validateStoreFile(Path path)
1534 throws IOException {
1535 StoreFile storeFile = null;
1536 try {
1537 storeFile = createStoreFileAndReader(path);
1538 } catch (IOException e) {
1539 LOG.error("Failed to open store file : " + path
1540 + ", keeping it in tmp location", e);
1541 throw e;
1542 } finally {
1543 if (storeFile != null) {
1544 storeFile.closeReader(false);
1545 }
1546 }
1547 }
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564 @VisibleForTesting
1565 protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1566 throws IOException {
1567 try {
1568
1569
1570
1571
1572 notifyChangedReadersObservers();
1573
1574
1575
1576 LOG.debug("Removing store files after compaction...");
1577 for (StoreFile compactedFile : compactedFiles) {
1578 compactedFile.closeReader(true);
1579 }
1580 this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1581 } catch (IOException e) {
1582 e = RemoteExceptionHandler.checkIOException(e);
1583 LOG.error("Failed removing compacted files in " + this +
1584 ". Files we were trying to remove are " + compactedFiles.toString() +
1585 "; some of them may have been already removed", e);
1586 }
1587
1588
1589 this.storeSize = 0L;
1590 this.totalUncompressedBytes = 0L;
1591 for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1592 StoreFile.Reader r = hsf.getReader();
1593 if (r == null) {
1594 LOG.warn("StoreFile " + hsf + " has a null Reader");
1595 continue;
1596 }
1597 this.storeSize += r.length();
1598 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1599 }
1600 }
1601
1602
1603
1604
1605
1606 int versionsToReturn(final int wantedVersions) {
1607 if (wantedVersions <= 0) {
1608 throw new IllegalArgumentException("Number of versions must be > 0");
1609 }
1610
1611 int maxVersions = this.family.getMaxVersions();
1612 return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1613 }
1614
1615
1616
1617
1618
1619
1620 static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
1621
1622
1623 if (cell.getTagsLengthUnsigned() > 0) {
1624
1625
1626
1627 Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
1628 cell.getTagsLengthUnsigned());
1629 while (i.hasNext()) {
1630 Tag t = i.next();
1631 if (TagType.TTL_TAG_TYPE == t.getType()) {
1632
1633
1634 long ts = cell.getTimestamp();
1635 assert t.getTagLength() == Bytes.SIZEOF_LONG;
1636 long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
1637 if (ts + ttl < now) {
1638 return true;
1639 }
1640
1641
1642 break;
1643 }
1644 }
1645 }
1646 return false;
1647 }
1648
1649 @Override
1650 public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
1651
1652
1653
1654
1655
1656
1657 long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1658
1659 KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1660
1661 GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1662 this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1663 this.lock.readLock().lock();
1664 try {
1665
1666 this.memstore.getRowKeyAtOrBefore(state);
1667
1668
1669 Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1670 .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1671 while (sfIterator.hasNext()) {
1672 StoreFile sf = sfIterator.next();
1673 sfIterator.remove();
1674 boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1675 KeyValue keyv = state.getCandidate();
1676
1677 if (keyv != null && keyv.matchingRow(row)) return state.getCandidate();
1678 if (haveNewCandidate) {
1679 sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1680 sfIterator, state.getTargetKey(), state.getCandidate());
1681 }
1682 }
1683 return state.getCandidate();
1684 } finally {
1685 this.lock.readLock().unlock();
1686 }
1687 }
1688
1689
1690
1691
1692
1693
1694
1695
1696 private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1697 final GetClosestRowBeforeTracker state)
1698 throws IOException {
1699 StoreFile.Reader r = f.getReader();
1700 if (r == null) {
1701 LOG.warn("StoreFile " + f + " has a null Reader");
1702 return false;
1703 }
1704 if (r.getEntries() == 0) {
1705 LOG.warn("StoreFile " + f + " is a empty store file");
1706 return false;
1707 }
1708
1709 byte [] fk = r.getFirstKey();
1710 if (fk == null) return false;
1711 KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1712 byte [] lk = r.getLastKey();
1713 KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1714 KeyValue firstOnRow = state.getTargetKey();
1715 if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1716
1717
1718 if (!state.isTargetTable(lastKV)) return false;
1719
1720
1721 firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1722 }
1723
1724 HFileScanner scanner = r.getScanner(true, true, false);
1725
1726 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1727
1728
1729 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1730
1731 while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1732 firstOnRow.getKeyLength())) {
1733 KeyValue kv = scanner.getKeyValue();
1734 if (!state.isTargetTable(kv)) break;
1735 if (!state.isBetterCandidate(kv)) break;
1736
1737 firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1738
1739 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1740
1741 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1742 }
1743 return false;
1744 }
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754 private boolean seekToScanner(final HFileScanner scanner,
1755 final KeyValue firstOnRow,
1756 final KeyValue firstKV)
1757 throws IOException {
1758 KeyValue kv = firstOnRow;
1759
1760 if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1761 int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
1762 kv.getKeyLength());
1763 return result != -1;
1764 }
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776 private boolean walkForwardInSingleRow(final HFileScanner scanner,
1777 final KeyValue firstOnRow,
1778 final GetClosestRowBeforeTracker state)
1779 throws IOException {
1780 boolean foundCandidate = false;
1781 do {
1782 KeyValue kv = scanner.getKeyValue();
1783
1784 if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1785
1786 if (state.isTooFar(kv, firstOnRow)) break;
1787 if (state.isExpired(kv)) {
1788 continue;
1789 }
1790
1791 if (state.handle(kv)) {
1792 foundCandidate = true;
1793 break;
1794 }
1795 } while(scanner.next());
1796 return foundCandidate;
1797 }
1798
1799 @Override
1800 public boolean canSplit() {
1801 this.lock.readLock().lock();
1802 try {
1803
1804 boolean result = !hasReferences();
1805 if (!result && LOG.isDebugEnabled()) {
1806 LOG.debug("Cannot split region due to reference files being there");
1807 }
1808 return result;
1809 } finally {
1810 this.lock.readLock().unlock();
1811 }
1812 }
1813
1814 @Override
1815 public byte[] getSplitPoint() {
1816 this.lock.readLock().lock();
1817 try {
1818
1819 assert !this.getRegionInfo().isMetaRegion();
1820
1821 if (hasReferences()) {
1822 return null;
1823 }
1824 return this.storeEngine.getStoreFileManager().getSplitPoint();
1825 } catch(IOException e) {
1826 LOG.warn("Failed getting store size for " + this, e);
1827 } finally {
1828 this.lock.readLock().unlock();
1829 }
1830 return null;
1831 }
1832
1833 @Override
1834 public long getLastCompactSize() {
1835 return this.lastCompactSize;
1836 }
1837
1838 @Override
1839 public long getSize() {
1840 return storeSize;
1841 }
1842
1843 @Override
1844 public void triggerMajorCompaction() {
1845 this.forceMajor = true;
1846 }
1847
1848 boolean getForceMajorCompaction() {
1849 return this.forceMajor;
1850 }
1851
1852
1853
1854
1855
1856 @Override
1857 public KeyValueScanner getScanner(Scan scan,
1858 final NavigableSet<byte []> targetCols, long readPt) throws IOException {
1859 lock.readLock().lock();
1860 try {
1861 KeyValueScanner scanner = null;
1862 if (this.getCoprocessorHost() != null) {
1863 scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
1864 }
1865 if (scanner == null) {
1866 scanner = scan.isReversed() ? new ReversedStoreScanner(this,
1867 getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
1868 getScanInfo(), scan, targetCols, readPt);
1869 }
1870 return scanner;
1871 } finally {
1872 lock.readLock().unlock();
1873 }
1874 }
1875
1876 @Override
1877 public String toString() {
1878 return this.getColumnFamilyName();
1879 }
1880
1881 @Override
1882 public int getStorefilesCount() {
1883 return this.storeEngine.getStoreFileManager().getStorefileCount();
1884 }
1885
1886 @Override
1887 public long getStoreSizeUncompressed() {
1888 return this.totalUncompressedBytes;
1889 }
1890
1891 @Override
1892 public long getStorefilesSize() {
1893 long size = 0;
1894 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1895 StoreFile.Reader r = s.getReader();
1896 if (r == null) {
1897 LOG.warn("StoreFile " + s + " has a null Reader");
1898 continue;
1899 }
1900 size += r.length();
1901 }
1902 return size;
1903 }
1904
1905 @Override
1906 public long getStorefilesIndexSize() {
1907 long size = 0;
1908 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
1909 StoreFile.Reader r = s.getReader();
1910 if (r == null) {
1911 LOG.warn("StoreFile " + s + " has a null Reader");
1912 continue;
1913 }
1914 size += r.indexSize();
1915 }
1916 return size;
1917 }
1918
1919 @Override
1920 public long getTotalStaticIndexSize() {
1921 long size = 0;
1922 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1923 size += s.getReader().getUncompressedDataIndexSize();
1924 }
1925 return size;
1926 }
1927
1928 @Override
1929 public long getTotalStaticBloomSize() {
1930 long size = 0;
1931 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
1932 StoreFile.Reader r = s.getReader();
1933 size += r.getTotalBloomSize();
1934 }
1935 return size;
1936 }
1937
1938 @Override
1939 public long getMemStoreSize() {
1940 return this.memstore.heapSize();
1941 }
1942
1943 @Override
1944 public int getCompactPriority() {
1945 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
1946 if (priority == PRIORITY_USER) {
1947 LOG.warn("Compaction priority is USER despite there being no user compaction");
1948 }
1949 return priority;
1950 }
1951
1952 @Override
1953 public boolean throttleCompaction(long compactionSize) {
1954 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
1955 }
1956
1957 public HRegion getHRegion() {
1958 return this.region;
1959 }
1960
1961 @Override
1962 public RegionCoprocessorHost getCoprocessorHost() {
1963 return this.region.getCoprocessorHost();
1964 }
1965
1966 @Override
1967 public HRegionInfo getRegionInfo() {
1968 return this.fs.getRegionInfo();
1969 }
1970
1971 @Override
1972 public boolean areWritesEnabled() {
1973 return this.region.areWritesEnabled();
1974 }
1975
1976 @Override
1977 public long getSmallestReadPoint() {
1978 return this.region.getSmallestReadPoint();
1979 }
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994 public long updateColumnValue(byte [] row, byte [] f,
1995 byte [] qualifier, long newValue)
1996 throws IOException {
1997
1998 this.lock.readLock().lock();
1999 try {
2000 long now = EnvironmentEdgeManager.currentTimeMillis();
2001
2002 return this.memstore.updateColumnValue(row,
2003 f,
2004 qualifier,
2005 newValue,
2006 now);
2007
2008 } finally {
2009 this.lock.readLock().unlock();
2010 }
2011 }
2012
2013 @Override
2014 public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2015 this.lock.readLock().lock();
2016 try {
2017 return this.memstore.upsert(cells, readpoint);
2018 } finally {
2019 this.lock.readLock().unlock();
2020 }
2021 }
2022
2023 @Override
2024 public StoreFlushContext createFlushContext(long cacheFlushId) {
2025 return new StoreFlusherImpl(cacheFlushId);
2026 }
2027
2028 private class StoreFlusherImpl implements StoreFlushContext {
2029
2030 private long cacheFlushSeqNum;
2031 private SortedSet<KeyValue> snapshot;
2032 private List<Path> tempFiles;
2033 private TimeRangeTracker snapshotTimeRangeTracker;
2034 private long flushedCount;
2035 private final AtomicLong flushedSize = new AtomicLong();
2036
2037 private StoreFlusherImpl(long cacheFlushSeqNum) {
2038 this.cacheFlushSeqNum = cacheFlushSeqNum;
2039 }
2040
2041
2042
2043
2044
2045 @Override
2046 public void prepare() {
2047 memstore.snapshot();
2048 this.snapshot = memstore.getSnapshot();
2049 this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
2050 this.flushedCount = this.snapshot.size();
2051 }
2052
2053 @Override
2054 public void flushCache(MonitoredTask status) throws IOException {
2055 tempFiles = HStore.this.flushCache(
2056 cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
2057 }
2058
2059 @Override
2060 public boolean commit(MonitoredTask status) throws IOException {
2061 if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2062 return false;
2063 }
2064 List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2065 for (Path storeFilePath : tempFiles) {
2066 try {
2067 storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
2068 snapshotTimeRangeTracker, flushedSize, status));
2069 } catch (IOException ex) {
2070 LOG.error("Failed to commit store file " + storeFilePath, ex);
2071
2072 for (StoreFile sf : storeFiles) {
2073 Path pathToDelete = sf.getPath();
2074 try {
2075 sf.deleteReader();
2076 } catch (IOException deleteEx) {
2077 LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2078 Runtime.getRuntime().halt(1);
2079 }
2080 }
2081 throw new IOException("Failed to commit the flush", ex);
2082 }
2083 }
2084
2085 if (HStore.this.getCoprocessorHost() != null) {
2086 for (StoreFile sf : storeFiles) {
2087 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2088 }
2089 }
2090
2091 HStore.this.flushedCellsCount += flushedCount;
2092 HStore.this.flushedCellsSize += flushedSize.get();
2093
2094
2095 return HStore.this.updateStorefiles(storeFiles, snapshot);
2096 }
2097 }
2098
2099 @Override
2100 public boolean needsCompaction() {
2101 return this.storeEngine.needsCompaction(this.filesCompacting);
2102 }
2103
2104 @Override
2105 public CacheConfig getCacheConfig() {
2106 return this.cacheConf;
2107 }
2108
2109 public static final long FIXED_OVERHEAD =
2110 ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
2111 + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2112
2113 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2114 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2115 + ClassSize.CONCURRENT_SKIPLISTMAP
2116 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2117 + ScanInfo.FIXED_OVERHEAD);
2118
2119 @Override
2120 public long heapSize() {
2121 return DEEP_OVERHEAD + this.memstore.heapSize();
2122 }
2123
2124 @Override
2125 public KeyValue.KVComparator getComparator() {
2126 return comparator;
2127 }
2128
2129 @Override
2130 public ScanInfo getScanInfo() {
2131 return scanInfo;
2132 }
2133
2134
2135
2136
2137
2138 void setScanInfo(ScanInfo scanInfo) {
2139 this.scanInfo = scanInfo;
2140 }
2141
2142 @Override
2143 public boolean hasTooManyStoreFiles() {
2144 return getStorefilesCount() > this.blockingFileCount;
2145 }
2146
2147 @Override
2148 public long getFlushedCellsCount() {
2149 return flushedCellsCount;
2150 }
2151
2152 @Override
2153 public long getFlushedCellsSize() {
2154 return flushedCellsSize;
2155 }
2156
2157 @Override
2158 public long getCompactedCellsCount() {
2159 return compactedCellsCount;
2160 }
2161
2162 @Override
2163 public long getCompactedCellsSize() {
2164 return compactedCellsSize;
2165 }
2166
2167 @Override
2168 public long getMajorCompactedCellsCount() {
2169 return majorCompactedCellsCount;
2170 }
2171
2172 @Override
2173 public long getMajorCompactedCellsSize() {
2174 return majorCompactedCellsSize;
2175 }
2176
2177 @Override
2178 public double getCompactionPressure() {
2179 return storeEngine.getStoreFileManager().getCompactionPressure();
2180 }
2181 }