1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.io.UnsupportedEncodingException;
25 import java.lang.reflect.Constructor;
26 import java.text.ParseException;
27 import java.util.AbstractList;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NavigableMap;
37 import java.util.NavigableSet;
38 import java.util.RandomAccess;
39 import java.util.Set;
40 import java.util.TreeMap;
41 import java.util.UUID;
42 import java.util.concurrent.Callable;
43 import java.util.concurrent.CompletionService;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.ConcurrentSkipListMap;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.ExecutorCompletionService;
49 import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.Executors;
51 import java.util.concurrent.Future;
52 import java.util.concurrent.FutureTask;
53 import java.util.concurrent.ThreadFactory;
54 import java.util.concurrent.ThreadPoolExecutor;
55 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.TimeoutException;
57 import java.util.concurrent.atomic.AtomicBoolean;
58 import java.util.concurrent.atomic.AtomicInteger;
59 import java.util.concurrent.atomic.AtomicLong;
60 import java.util.concurrent.locks.Lock;
61 import java.util.concurrent.locks.ReentrantReadWriteLock;
62
63 import org.apache.commons.logging.Log;
64 import org.apache.commons.logging.LogFactory;
65 import org.apache.hadoop.conf.Configuration;
66 import org.apache.hadoop.fs.FileStatus;
67 import org.apache.hadoop.fs.FileSystem;
68 import org.apache.hadoop.fs.Path;
69 import org.apache.hadoop.hbase.Cell;
70 import org.apache.hadoop.hbase.CellScanner;
71 import org.apache.hadoop.hbase.CellUtil;
72 import org.apache.hadoop.hbase.CompoundConfiguration;
73 import org.apache.hadoop.hbase.DoNotRetryIOException;
74 import org.apache.hadoop.hbase.DroppedSnapshotException;
75 import org.apache.hadoop.hbase.HBaseConfiguration;
76 import org.apache.hadoop.hbase.HColumnDescriptor;
77 import org.apache.hadoop.hbase.HConstants;
78 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
79 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
80 import org.apache.hadoop.hbase.HRegionInfo;
81 import org.apache.hadoop.hbase.HTableDescriptor;
82 import org.apache.hadoop.hbase.KeyValue;
83 import org.apache.hadoop.hbase.KeyValueUtil;
84 import org.apache.hadoop.hbase.NamespaceDescriptor;
85 import org.apache.hadoop.hbase.NotServingRegionException;
86 import org.apache.hadoop.hbase.RegionTooBusyException;
87 import org.apache.hadoop.hbase.TableName;
88 import org.apache.hadoop.hbase.Tag;
89 import org.apache.hadoop.hbase.TagType;
90 import org.apache.hadoop.hbase.UnknownScannerException;
91 import org.apache.hadoop.hbase.backup.HFileArchiver;
92 import org.apache.hadoop.hbase.classification.InterfaceAudience;
93 import org.apache.hadoop.hbase.client.Append;
94 import org.apache.hadoop.hbase.client.Delete;
95 import org.apache.hadoop.hbase.client.Durability;
96 import org.apache.hadoop.hbase.client.Get;
97 import org.apache.hadoop.hbase.client.Increment;
98 import org.apache.hadoop.hbase.client.IsolationLevel;
99 import org.apache.hadoop.hbase.client.Mutation;
100 import org.apache.hadoop.hbase.client.Put;
101 import org.apache.hadoop.hbase.client.Result;
102 import org.apache.hadoop.hbase.client.RowMutations;
103 import org.apache.hadoop.hbase.client.Scan;
104 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
105 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
106 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
107 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
108 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
109 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
110 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
111 import org.apache.hadoop.hbase.filter.FilterWrapper;
112 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
113 import org.apache.hadoop.hbase.io.HeapSize;
114 import org.apache.hadoop.hbase.io.TimeRange;
115 import org.apache.hadoop.hbase.io.hfile.BlockCache;
116 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
117 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
118 import org.apache.hadoop.hbase.ipc.RpcCallContext;
119 import org.apache.hadoop.hbase.ipc.RpcServer;
120 import org.apache.hadoop.hbase.master.AssignmentManager;
121 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
122 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
124 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
125 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
126 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
127 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
128 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
129 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
130 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
131 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
132 import org.apache.hadoop.hbase.regionserver.wal.HLog;
133 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
134 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
135 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
136 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
137 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
138 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
139 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
140 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
141 import org.apache.hadoop.hbase.util.Bytes;
142 import org.apache.hadoop.hbase.util.CancelableProgressable;
143 import org.apache.hadoop.hbase.util.ClassSize;
144 import org.apache.hadoop.hbase.util.CompressionTest;
145 import org.apache.hadoop.hbase.util.EncryptionTest;
146 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
147 import org.apache.hadoop.hbase.util.FSTableDescriptors;
148 import org.apache.hadoop.hbase.util.FSUtils;
149 import org.apache.hadoop.hbase.util.HashedBytes;
150 import org.apache.hadoop.hbase.util.Pair;
151 import org.apache.hadoop.hbase.util.Threads;
152 import org.apache.hadoop.io.MultipleIOException;
153 import org.apache.hadoop.util.StringUtils;
154 import org.cliffc.high_scale_lib.Counter;
155 import org.cloudera.htrace.Trace;
156 import org.cloudera.htrace.TraceScope;
157
158 import com.google.common.annotations.VisibleForTesting;
159 import com.google.common.base.Preconditions;
160 import com.google.common.collect.Lists;
161 import com.google.common.collect.Maps;
162 import com.google.common.io.Closeables;
163 import com.google.protobuf.Descriptors;
164 import com.google.protobuf.Message;
165 import com.google.protobuf.RpcCallback;
166 import com.google.protobuf.RpcController;
167 import com.google.protobuf.Service;
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 @InterfaceAudience.Private
206 public class HRegion implements HeapSize {
207 public static final Log LOG = LogFactory.getLog(HRegion.class);
208
209 public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
210 "hbase.hregion.scan.loadColumnFamiliesOnDemand";
211
212
213
214
215
216 private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
217
218 final AtomicBoolean closed = new AtomicBoolean(false);
219
220
221
222
223
224 final AtomicBoolean closing = new AtomicBoolean(false);
225
226 protected volatile long completeSequenceId = -1L;
227
228
229
230
231
232
233 private final AtomicLong sequenceId = new AtomicLong(-1L);
234
235
236
237
238
239
240
241 public enum Operation {
242 ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
243 REPLAY_BATCH_MUTATE, COMPACT_REGION
244 }
245
246
247
248
249
250
251
252
253
254
255 private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
256 new ConcurrentHashMap<HashedBytes, RowLockContext>();
257
258 protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
259 Bytes.BYTES_RAWCOMPARATOR);
260
261
262 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
263
264 public final AtomicLong memstoreSize = new AtomicLong(0);
265
266
267 final Counter numMutationsWithoutWAL = new Counter();
268 final Counter dataInMemoryWithoutWAL = new Counter();
269
270
271 final Counter checkAndMutateChecksPassed = new Counter();
272 final Counter checkAndMutateChecksFailed = new Counter();
273
274
275 final Counter readRequestsCount = new Counter();
276 final Counter writeRequestsCount = new Counter();
277
278
279 private final Counter blockedRequestsCount = new Counter();
280
281
282
283
284 public long getBlockedRequestsCount() {
285 return this.blockedRequestsCount.get();
286 }
287
288
289 final AtomicLong compactionsFinished = new AtomicLong(0L);
290 final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
291 final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
292
293
294 private final HLog log;
295 private final HRegionFileSystem fs;
296 protected final Configuration conf;
297 private final Configuration baseConf;
298 private final KeyValue.KVComparator comparator;
299 private final int rowLockWaitDuration;
300 static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
301
302
303
304
305
306
307
308 final long busyWaitDuration;
309 static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
310
311
312
313
314 final int maxBusyWaitMultiplier;
315
316
317
318 final long maxBusyWaitDuration;
319
320
321 static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
322 final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
323
324 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
325
326
327
328
329 private long openSeqNum = HConstants.NO_SEQNUM;
330
331
332
333
334
335 private boolean isLoadingCfsOnDemandDefault = false;
336
337 private final AtomicInteger majorInProgress = new AtomicInteger(0);
338 private final AtomicInteger minorInProgress = new AtomicInteger(0);
339
340
341
342
343
344
345
346 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
347
348
349
350
351 private boolean disallowWritesInRecovering = false;
352
353
354 private volatile boolean isRecovering = false;
355
356
357
358
359
360
361 public long getSmallestReadPoint() {
362 long minimumReadPoint;
363
364
365
366 synchronized(scannerReadPoints) {
367 minimumReadPoint = mvcc.memstoreReadPoint();
368
369 for (Long readPoint: this.scannerReadPoints.values()) {
370 if (readPoint < minimumReadPoint) {
371 minimumReadPoint = readPoint;
372 }
373 }
374 }
375 return minimumReadPoint;
376 }
377
378
379
380
381 static class WriteState {
382
383 volatile boolean flushing = false;
384
385 volatile boolean flushRequested = false;
386
387 volatile int compacting = 0;
388
389 volatile boolean writesEnabled = true;
390
391 volatile boolean readOnly = false;
392
393
394
395
396
397
398 synchronized void setReadOnly(final boolean onOff) {
399 this.writesEnabled = !onOff;
400 this.readOnly = onOff;
401 }
402
403 boolean isReadOnly() {
404 return this.readOnly;
405 }
406
407 boolean isFlushRequested() {
408 return this.flushRequested;
409 }
410
411 static final long HEAP_SIZE = ClassSize.align(
412 ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
413 }
414
415
416
417
418
419
420
421 public static class FlushResult {
422 enum Result {
423 FLUSHED_NO_COMPACTION_NEEDED,
424 FLUSHED_COMPACTION_NEEDED,
425
426
427 CANNOT_FLUSH_MEMSTORE_EMPTY,
428 CANNOT_FLUSH
429
430 }
431
432 final Result result;
433 final String failureReason;
434 final long flushSequenceId;
435
436
437
438
439
440
441
442
443 FlushResult(Result result, long flushSequenceId) {
444 this(result, flushSequenceId, null);
445 assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
446 .FLUSHED_COMPACTION_NEEDED;
447 }
448
449
450
451
452
453
454 FlushResult(Result result, String failureReason) {
455 this(result, -1, failureReason);
456 assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
457 }
458
459
460
461
462
463
464
465 FlushResult(Result result, long flushSequenceId, String failureReason) {
466 this.result = result;
467 this.flushSequenceId = flushSequenceId;
468 this.failureReason = failureReason;
469 }
470
471
472
473
474
475
476 public boolean isFlushSucceeded() {
477 return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
478 .FLUSHED_COMPACTION_NEEDED;
479 }
480
481
482
483
484
485 public boolean isCompactionNeeded() {
486 return result == Result.FLUSHED_COMPACTION_NEEDED;
487 }
488 }
489
490 final WriteState writestate = new WriteState();
491
492 long memstoreFlushSize;
493 final long timestampSlop;
494 final long rowProcessorTimeout;
495 private volatile long lastFlushTime;
496 final RegionServerServices rsServices;
497 private RegionServerAccounting rsAccounting;
498 private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
499 private long flushCheckInterval;
500
501 private long flushPerChanges;
502 private long blockingMemStoreSize;
503 final long threadWakeFrequency;
504
505 final ReentrantReadWriteLock lock =
506 new ReentrantReadWriteLock();
507
508
509 private final ReentrantReadWriteLock updatesLock =
510 new ReentrantReadWriteLock();
511 private boolean splitRequest;
512 private byte[] explicitSplitPoint = null;
513
514 private final MultiVersionConsistencyControl mvcc =
515 new MultiVersionConsistencyControl();
516
517
518 private RegionCoprocessorHost coprocessorHost;
519
520 private HTableDescriptor htableDescriptor = null;
521 private RegionSplitPolicy splitPolicy;
522
523 private final MetricsRegion metricsRegion;
524 private final MetricsRegionWrapperImpl metricsRegionWrapper;
525 private final Durability durability;
526 private final boolean regionStatsEnabled;
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549 @Deprecated
550 public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
551 final Configuration confParam, final HRegionInfo regionInfo,
552 final HTableDescriptor htd, final RegionServerServices rsServices) {
553 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
554 log, confParam, htd, rsServices);
555 }
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574 public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
575 final HTableDescriptor htd, final RegionServerServices rsServices) {
576 if (htd == null) {
577 throw new IllegalArgumentException("Need table descriptor");
578 }
579
580 if (confParam instanceof CompoundConfiguration) {
581 throw new IllegalArgumentException("Need original base configuration");
582 }
583
584 this.comparator = fs.getRegionInfo().getComparator();
585 this.log = log;
586 this.fs = fs;
587
588
589 this.baseConf = confParam;
590 this.conf = new CompoundConfiguration()
591 .add(confParam)
592 .addStringMap(htd.getConfiguration())
593 .addWritableMap(htd.getValues());
594 this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
595 DEFAULT_CACHE_FLUSH_INTERVAL);
596 this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
597 if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
598 throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
599 + MAX_FLUSH_PER_CHANGES);
600 }
601
602 this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
603 DEFAULT_ROWLOCK_WAIT_DURATION);
604
605 this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
606 this.htableDescriptor = htd;
607 this.rsServices = rsServices;
608 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
609 setHTableSpecificConf();
610 this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
611
612 this.busyWaitDuration = conf.getLong(
613 "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
614 this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
615 if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
616 throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
617 + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
618 + maxBusyWaitMultiplier + "). Their product should be positive");
619 }
620 this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
621 conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
622
623
624
625
626
627
628
629 this.timestampSlop = conf.getLong(
630 "hbase.hregion.keyvalue.timestamp.slop.millisecs",
631 HConstants.LATEST_TIMESTAMP);
632
633
634
635
636
637 this.rowProcessorTimeout = conf.getLong(
638 "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
639 this.durability = htd.getDurability() == Durability.USE_DEFAULT
640 ? DEFAULT_DURABLITY
641 : htd.getDurability();
642 if (rsServices != null) {
643 this.rsAccounting = this.rsServices.getRegionServerAccounting();
644
645
646 this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
647 this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
648 this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
649
650 Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
651 String encodedName = getRegionInfo().getEncodedName();
652 if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
653 this.isRecovering = true;
654 recoveringRegions.put(encodedName, this);
655 }
656 } else {
657 this.metricsRegionWrapper = null;
658 this.metricsRegion = null;
659 }
660 if (LOG.isDebugEnabled()) {
661
662 LOG.debug("Instantiated " + this);
663 }
664
665
666 this.disallowWritesInRecovering =
667 conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
668 HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
669
670
671 this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
672 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ? false :
673 conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
674 HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
675 }
676
677 void setHTableSpecificConf() {
678 if (this.htableDescriptor == null) return;
679 long flushSize = this.htableDescriptor.getMemStoreFlushSize();
680
681 if (flushSize <= 0) {
682 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
683 HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
684 }
685 this.memstoreFlushSize = flushSize;
686 this.blockingMemStoreSize = this.memstoreFlushSize *
687 conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
688 }
689
690
691
692
693
694
695
696
697
698 @Deprecated
699 public long initialize() throws IOException {
700 return initialize(null);
701 }
702
703
704
705
706
707
708
709
710 private long initialize(final CancelableProgressable reporter) throws IOException {
711 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
712 long nextSeqId = -1;
713 try {
714 nextSeqId = initializeRegionInternals(reporter, status);
715 return nextSeqId;
716 } finally {
717
718
719 if (nextSeqId == -1) {
720 status
721 .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
722 }
723 }
724 }
725
726 private long initializeRegionInternals(final CancelableProgressable reporter,
727 final MonitoredTask status) throws IOException, UnsupportedEncodingException {
728 if (coprocessorHost != null) {
729 status.setStatus("Running coprocessor pre-open hook");
730 coprocessorHost.preOpen();
731 }
732
733
734 status.setStatus("Writing region info on filesystem");
735 fs.checkRegionInfoOnFilesystem();
736
737
738 status.setStatus("Cleaning up temporary data from old regions");
739 fs.cleanupTempDir();
740
741
742 status.setStatus("Initializing all the Stores");
743 long maxSeqId = initializeRegionStores(reporter, status);
744
745 status.setStatus("Cleaning up detritus from prior splits");
746
747
748
749 fs.cleanupAnySplitDetritus();
750 fs.cleanupMergesDir();
751
752 this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
753 this.writestate.flushRequested = false;
754 this.writestate.compacting = 0;
755
756
757 this.splitPolicy = RegionSplitPolicy.create(this, conf);
758
759 this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
760
761
762 long nextSeqid = maxSeqId + 1;
763 if (this.isRecovering) {
764
765
766
767 nextSeqid += this.flushPerChanges + 10000000;
768 }
769 LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
770 "; next sequenceid=" + nextSeqid);
771
772
773 this.closing.set(false);
774 this.closed.set(false);
775
776 if (coprocessorHost != null) {
777 status.setStatus("Running coprocessor post-open hooks");
778 coprocessorHost.postOpen();
779 }
780
781 status.markComplete("Region opened successfully");
782 return nextSeqid;
783 }
784
785 private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
786 throws IOException, UnsupportedEncodingException {
787
788
789 long maxSeqId = -1;
790
791 long maxMemstoreTS = -1;
792
793 if (!htableDescriptor.getFamilies().isEmpty()) {
794
795 ThreadPoolExecutor storeOpenerThreadPool =
796 getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
797 CompletionService<HStore> completionService =
798 new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
799
800
801 for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
802 status.setStatus("Instantiating store for column family " + family);
803 completionService.submit(new Callable<HStore>() {
804 @Override
805 public HStore call() throws IOException {
806 return instantiateHStore(family);
807 }
808 });
809 }
810 boolean allStoresOpened = false;
811 try {
812 for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
813 Future<HStore> future = completionService.take();
814 HStore store = future.get();
815 this.stores.put(store.getColumnFamilyName().getBytes(), store);
816
817 long storeMaxSequenceId = store.getMaxSequenceId();
818 maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
819 storeMaxSequenceId);
820 if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
821 maxSeqId = storeMaxSequenceId;
822 }
823 long maxStoreMemstoreTS = store.getMaxMemstoreTS();
824 if (maxStoreMemstoreTS > maxMemstoreTS) {
825 maxMemstoreTS = maxStoreMemstoreTS;
826 }
827 }
828 allStoresOpened = true;
829 } catch (InterruptedException e) {
830 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
831 } catch (ExecutionException e) {
832 throw new IOException(e.getCause());
833 } finally {
834 storeOpenerThreadPool.shutdownNow();
835 if (!allStoresOpened) {
836
837 LOG.error("Could not initialize all stores for the region=" + this);
838 for (Store store : this.stores.values()) {
839 try {
840 store.close();
841 } catch (IOException e) {
842 LOG.warn(e.getMessage());
843 }
844 }
845 }
846 }
847 }
848 mvcc.initialize(maxMemstoreTS + 1);
849
850 maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
851 this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
852 return maxSeqId;
853 }
854
855
856
857
858 public boolean hasReferences() {
859 for (Store store : this.stores.values()) {
860 if (store.hasReferences()) return true;
861 }
862 return false;
863 }
864
865
866
867
868
869
870 public HDFSBlocksDistribution getHDFSBlocksDistribution() {
871 HDFSBlocksDistribution hdfsBlocksDistribution =
872 new HDFSBlocksDistribution();
873 synchronized (this.stores) {
874 for (Store store : this.stores.values()) {
875 for (StoreFile sf : store.getStorefiles()) {
876 HDFSBlocksDistribution storeFileBlocksDistribution =
877 sf.getHDFSBlockDistribution();
878 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
879 }
880 }
881 }
882 return hdfsBlocksDistribution;
883 }
884
885
886
887
888
889
890
891
892
893 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
894 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
895 Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
896 return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
897 }
898
899
900
901
902
903
904
905
906
907
908 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
909 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
910 throws IOException {
911 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
912 FileSystem fs = tablePath.getFileSystem(conf);
913
914 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
915 for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
916 Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
917 if (storeFiles == null) continue;
918
919 for (StoreFileInfo storeFileInfo : storeFiles) {
920 hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
921 }
922 }
923 return hdfsBlocksDistribution;
924 }
925
926 public AtomicLong getMemstoreSize() {
927 return memstoreSize;
928 }
929
930
931
932
933
934
935
936 public long addAndGetGlobalMemstoreSize(long memStoreSize) {
937 if (this.rsAccounting != null) {
938 rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
939 }
940 return this.memstoreSize.addAndGet(memStoreSize);
941 }
942
943
944 public HRegionInfo getRegionInfo() {
945 return this.fs.getRegionInfo();
946 }
947
948
949
950
951
952 RegionServerServices getRegionServerServices() {
953 return this.rsServices;
954 }
955
956
957
958
959 public RegionSplitPolicy getSplitPolicy() {
960 return this.splitPolicy;
961 }
962
963
964 long getReadRequestsCount() {
965 return this.readRequestsCount.get();
966 }
967
968
969 long getWriteRequestsCount() {
970 return this.writeRequestsCount.get();
971 }
972
973 public MetricsRegion getMetrics() {
974 return metricsRegion;
975 }
976
977
978 public boolean isClosed() {
979 return this.closed.get();
980 }
981
982
983
984
985 public boolean isClosing() {
986 return this.closing.get();
987 }
988
989
990
991
992
993 public void setRecovering(boolean newState) {
994 boolean wasRecovering = this.isRecovering;
995 this.isRecovering = newState;
996 if (wasRecovering && !isRecovering) {
997
998 coprocessorHost.postLogReplay();
999 }
1000 }
1001
1002
1003
1004
1005 public boolean isRecovering() {
1006 return this.isRecovering;
1007 }
1008
1009
1010 public boolean isAvailable() {
1011 return !isClosed() && !isClosing();
1012 }
1013
1014
1015 public boolean isSplittable() {
1016 return isAvailable() && !hasReferences();
1017 }
1018
1019
1020
1021
1022 public boolean isMergeable() {
1023 if (!isAvailable()) {
1024 LOG.debug("Region " + this.getRegionNameAsString()
1025 + " is not mergeable because it is closing or closed");
1026 return false;
1027 }
1028 if (hasReferences()) {
1029 LOG.debug("Region " + this.getRegionNameAsString()
1030 + " is not mergeable because it has references");
1031 return false;
1032 }
1033
1034 return true;
1035 }
1036
1037 public boolean areWritesEnabled() {
1038 synchronized(this.writestate) {
1039 return this.writestate.writesEnabled;
1040 }
1041 }
1042
1043 public MultiVersionConsistencyControl getMVCC() {
1044 return mvcc;
1045 }
1046
1047
1048
1049
1050 public long getReadpoint(IsolationLevel isolationLevel) {
1051 if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1052
1053 return Long.MAX_VALUE;
1054 }
1055 return mvcc.memstoreReadPoint();
1056 }
1057
1058 public boolean isLoadingCfsOnDemandDefault() {
1059 return this.isLoadingCfsOnDemandDefault;
1060 }
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075 public Map<byte[], List<StoreFile>> close() throws IOException {
1076 return close(false);
1077 }
1078
1079 private final Object closeLock = new Object();
1080
1081
1082 public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1083 "hbase.regionserver.optionalcacheflushinterval";
1084
1085 public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1086
1087
1088 public static final String MEMSTORE_FLUSH_PER_CHANGES =
1089 "hbase.regionserver.flush.per.changes";
1090 public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000;
1091
1092
1093
1094
1095 public static final long MAX_FLUSH_PER_CHANGES = 1000000000;
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111 public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1112
1113
1114 MonitoredTask status = TaskMonitor.get().createStatus(
1115 "Closing region " + this +
1116 (abort ? " due to abort" : ""));
1117
1118 status.setStatus("Waiting for close lock");
1119 try {
1120 synchronized (closeLock) {
1121 return doClose(abort, status);
1122 }
1123 } finally {
1124 status.cleanup();
1125 }
1126 }
1127
1128 private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1129 throws IOException {
1130 if (isClosed()) {
1131 LOG.warn("Region " + this + " already closed");
1132 return null;
1133 }
1134
1135 if (coprocessorHost != null) {
1136 status.setStatus("Running coprocessor pre-close hooks");
1137 this.coprocessorHost.preClose(abort);
1138 }
1139
1140 status.setStatus("Disabling compacts and flushes for region");
1141 synchronized (writestate) {
1142
1143
1144 writestate.writesEnabled = false;
1145 LOG.debug("Closing " + this + ": disabling compactions & flushes");
1146 waitForFlushesAndCompactions();
1147 }
1148
1149
1150
1151 if (!abort && worthPreFlushing()) {
1152 status.setStatus("Pre-flushing region before close");
1153 LOG.info("Running close preflush of " + this.getRegionNameAsString());
1154 try {
1155 internalFlushcache(status);
1156 } catch (IOException ioe) {
1157
1158 status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1159 }
1160 }
1161
1162 this.closing.set(true);
1163 status.setStatus("Disabling writes for close");
1164
1165 lock.writeLock().lock();
1166 try {
1167 if (this.isClosed()) {
1168 status.abort("Already got closed by another process");
1169
1170 return null;
1171 }
1172 LOG.debug("Updates disabled for region " + this);
1173
1174 if (!abort) {
1175 int flushCount = 0;
1176 while (this.getMemstoreSize().get() > 0) {
1177 try {
1178 if (flushCount++ > 0) {
1179 int actualFlushes = flushCount - 1;
1180 if (actualFlushes > 5) {
1181
1182
1183 throw new DroppedSnapshotException("Failed clearing memory after " +
1184 actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1185 }
1186 LOG.info("Running extra flush, " + actualFlushes +
1187 " (carrying snapshot?) " + this);
1188 }
1189 internalFlushcache(status);
1190 } catch (IOException ioe) {
1191 status.setStatus("Failed flush " + this + ", putting online again");
1192 synchronized (writestate) {
1193 writestate.writesEnabled = true;
1194 }
1195
1196 throw ioe;
1197 }
1198 }
1199 }
1200
1201 Map<byte[], List<StoreFile>> result =
1202 new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1203 if (!stores.isEmpty()) {
1204
1205 ThreadPoolExecutor storeCloserThreadPool =
1206 getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1207 CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1208 new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1209
1210
1211 for (final Store store : stores.values()) {
1212 assert abort? true: store.getFlushableSize() == 0;
1213 completionService
1214 .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1215 @Override
1216 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1217 return new Pair<byte[], Collection<StoreFile>>(
1218 store.getFamily().getName(), store.close());
1219 }
1220 });
1221 }
1222 try {
1223 for (int i = 0; i < stores.size(); i++) {
1224 Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1225 Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1226 List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1227 if (familyFiles == null) {
1228 familyFiles = new ArrayList<StoreFile>();
1229 result.put(storeFiles.getFirst(), familyFiles);
1230 }
1231 familyFiles.addAll(storeFiles.getSecond());
1232 }
1233 } catch (InterruptedException e) {
1234 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1235 } catch (ExecutionException e) {
1236 throw new IOException(e.getCause());
1237 } finally {
1238 storeCloserThreadPool.shutdownNow();
1239 }
1240 }
1241 this.closed.set(true);
1242 if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1243 if (coprocessorHost != null) {
1244 status.setStatus("Running coprocessor post-close hooks");
1245 this.coprocessorHost.postClose(abort);
1246 }
1247 if ( this.metricsRegion != null) {
1248 this.metricsRegion.close();
1249 }
1250 if ( this.metricsRegionWrapper != null) {
1251 Closeables.closeQuietly(this.metricsRegionWrapper);
1252 }
1253 status.markComplete("Closed");
1254 LOG.info("Closed " + this);
1255 return result;
1256 } finally {
1257 lock.writeLock().unlock();
1258 }
1259 }
1260
1261
1262
1263
1264
1265
1266 public void waitForFlushesAndCompactions() {
1267 synchronized (writestate) {
1268 while (writestate.compacting > 0 || writestate.flushing) {
1269 LOG.debug("waiting for " + writestate.compacting + " compactions"
1270 + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1271 try {
1272 writestate.wait();
1273 } catch (InterruptedException iex) {
1274
1275 Thread.currentThread().interrupt();
1276 }
1277 }
1278 }
1279 }
1280
1281 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1282 final String threadNamePrefix) {
1283 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1284 int maxThreads = Math.min(numStores,
1285 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1286 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1287 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1288 }
1289
1290 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1291 final String threadNamePrefix) {
1292 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1293 int maxThreads = Math.max(1,
1294 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1295 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1296 / numStores);
1297 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1298 }
1299
1300 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1301 final String threadNamePrefix) {
1302 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1303 new ThreadFactory() {
1304 private int count = 1;
1305
1306 @Override
1307 public Thread newThread(Runnable r) {
1308 return new Thread(r, threadNamePrefix + "-" + count++);
1309 }
1310 });
1311 }
1312
1313
1314
1315
1316 private boolean worthPreFlushing() {
1317 return this.memstoreSize.get() >
1318 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1319 }
1320
1321
1322
1323
1324
1325
1326 public byte [] getStartKey() {
1327 return this.getRegionInfo().getStartKey();
1328 }
1329
1330
1331 public byte [] getEndKey() {
1332 return this.getRegionInfo().getEndKey();
1333 }
1334
1335
1336 public long getRegionId() {
1337 return this.getRegionInfo().getRegionId();
1338 }
1339
1340
1341 public byte [] getRegionName() {
1342 return this.getRegionInfo().getRegionName();
1343 }
1344
1345
1346 public String getRegionNameAsString() {
1347 return this.getRegionInfo().getRegionNameAsString();
1348 }
1349
1350
1351 public HTableDescriptor getTableDesc() {
1352 return this.htableDescriptor;
1353 }
1354
1355
1356 public HLog getLog() {
1357 return this.log;
1358 }
1359
1360
1361
1362
1363
1364
1365
1366
1367 Configuration getBaseConf() {
1368 return this.baseConf;
1369 }
1370
1371
1372 public FileSystem getFilesystem() {
1373 return fs.getFileSystem();
1374 }
1375
1376
1377 public HRegionFileSystem getRegionFileSystem() {
1378 return this.fs;
1379 }
1380
1381
1382 public long getLastFlushTime() {
1383 return this.lastFlushTime;
1384 }
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394 public long getLargestHStoreSize() {
1395 long size = 0;
1396 for (Store h : stores.values()) {
1397 long storeSize = h.getSize();
1398 if (storeSize > size) {
1399 size = storeSize;
1400 }
1401 }
1402 return size;
1403 }
1404
1405
1406
1407
1408 public KeyValue.KVComparator getComparator() {
1409 return this.comparator;
1410 }
1411
1412
1413
1414
1415
1416 protected void doRegionCompactionPrep() throws IOException {
1417 }
1418
1419 void triggerMajorCompaction() {
1420 for (Store h : stores.values()) {
1421 h.triggerMajorCompaction();
1422 }
1423 }
1424
1425
1426
1427
1428
1429
1430
1431
1432 public void compactStores(final boolean majorCompaction)
1433 throws IOException {
1434 if (majorCompaction) {
1435 this.triggerMajorCompaction();
1436 }
1437 compactStores();
1438 }
1439
1440
1441
1442
1443
1444
1445
1446 public void compactStores() throws IOException {
1447 for (Store s : getStores().values()) {
1448 CompactionContext compaction = s.requestCompaction();
1449 if (compaction != null) {
1450 compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE);
1451 }
1452 }
1453 }
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469 public boolean compact(CompactionContext compaction, Store store,
1470 CompactionThroughputController throughputController) throws IOException {
1471 assert compaction != null && compaction.hasSelection();
1472 assert !compaction.getRequest().getFiles().isEmpty();
1473 if (this.closing.get() || this.closed.get()) {
1474 LOG.debug("Skipping compaction on " + this + " because closing/closed");
1475 store.cancelRequestedCompaction(compaction);
1476 return false;
1477 }
1478 MonitoredTask status = null;
1479 boolean requestNeedsCancellation = true;
1480
1481 lock.readLock().lock();
1482 try {
1483 byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1484 if (stores.get(cf) != store) {
1485 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1486 + " has been re-instantiated, cancel this compaction request. "
1487 + " It may be caused by the roll back of split transaction");
1488 return false;
1489 }
1490
1491 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1492 if (this.closed.get()) {
1493 String msg = "Skipping compaction on " + this + " because closed";
1494 LOG.debug(msg);
1495 status.abort(msg);
1496 return false;
1497 }
1498 boolean wasStateSet = false;
1499 try {
1500 synchronized (writestate) {
1501 if (writestate.writesEnabled) {
1502 wasStateSet = true;
1503 ++writestate.compacting;
1504 } else {
1505 String msg = "NOT compacting region " + this + ". Writes disabled.";
1506 LOG.info(msg);
1507 status.abort(msg);
1508 return false;
1509 }
1510 }
1511 LOG.info("Starting compaction on " + store + " in region " + this
1512 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1513 doRegionCompactionPrep();
1514 try {
1515 status.setStatus("Compacting store " + store);
1516
1517
1518 requestNeedsCancellation = false;
1519 store.compact(compaction, throughputController);
1520 } catch (InterruptedIOException iioe) {
1521 String msg = "compaction interrupted";
1522 LOG.info(msg, iioe);
1523 status.abort(msg);
1524 return false;
1525 }
1526 } finally {
1527 if (wasStateSet) {
1528 synchronized (writestate) {
1529 --writestate.compacting;
1530 if (writestate.compacting <= 0) {
1531 writestate.notifyAll();
1532 }
1533 }
1534 }
1535 }
1536 status.markComplete("Compaction complete");
1537 return true;
1538 } finally {
1539 try {
1540 if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
1541 if (status != null) status.cleanup();
1542 } finally {
1543 lock.readLock().unlock();
1544 }
1545 }
1546 }
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568 public FlushResult flushcache() throws IOException {
1569
1570 if (this.closing.get()) {
1571 String msg = "Skipping flush on " + this + " because closing";
1572 LOG.debug(msg);
1573 return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1574 }
1575 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1576 status.setStatus("Acquiring readlock on region");
1577
1578 lock.readLock().lock();
1579 try {
1580 if (this.closed.get()) {
1581 String msg = "Skipping flush on " + this + " because closed";
1582 LOG.debug(msg);
1583 status.abort(msg);
1584 return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1585 }
1586 if (coprocessorHost != null) {
1587 status.setStatus("Running coprocessor pre-flush hooks");
1588 coprocessorHost.preFlush();
1589 }
1590 if (numMutationsWithoutWAL.get() > 0) {
1591 numMutationsWithoutWAL.set(0);
1592 dataInMemoryWithoutWAL.set(0);
1593 }
1594 synchronized (writestate) {
1595 if (!writestate.flushing && writestate.writesEnabled) {
1596 this.writestate.flushing = true;
1597 } else {
1598 if (LOG.isDebugEnabled()) {
1599 LOG.debug("NOT flushing memstore for region " + this
1600 + ", flushing=" + writestate.flushing + ", writesEnabled="
1601 + writestate.writesEnabled);
1602 }
1603 String msg = "Not flushing since "
1604 + (writestate.flushing ? "already flushing"
1605 : "writes not enabled");
1606 status.abort(msg);
1607 return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1608 }
1609 }
1610 try {
1611 FlushResult fs = internalFlushcache(status);
1612
1613 if (coprocessorHost != null) {
1614 status.setStatus("Running post-flush coprocessor hooks");
1615 coprocessorHost.postFlush();
1616 }
1617
1618 status.markComplete("Flush successful");
1619 return fs;
1620 } finally {
1621 synchronized (writestate) {
1622 writestate.flushing = false;
1623 this.writestate.flushRequested = false;
1624 writestate.notifyAll();
1625 }
1626 }
1627 } finally {
1628 lock.readLock().unlock();
1629 status.cleanup();
1630 }
1631 }
1632
1633
1634
1635
1636 boolean shouldFlush() {
1637
1638 if (this.completeSequenceId > 0
1639 && (this.completeSequenceId + this.flushPerChanges < this.sequenceId.get())) {
1640 return true;
1641 }
1642 if (flushCheckInterval <= 0) {
1643 return false;
1644 }
1645 long now = EnvironmentEdgeManager.currentTimeMillis();
1646
1647 if ((now - getLastFlushTime() < flushCheckInterval)) {
1648 return false;
1649 }
1650
1651
1652 for (Store s : this.getStores().values()) {
1653 if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1654
1655 return true;
1656 }
1657 }
1658 return false;
1659 }
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696 protected FlushResult internalFlushcache(MonitoredTask status)
1697 throws IOException {
1698 return internalFlushcache(this.log, -1, status);
1699 }
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710 protected FlushResult internalFlushcache(
1711 final HLog wal, final long myseqid, MonitoredTask status)
1712 throws IOException {
1713 if (this.rsServices != null && this.rsServices.isAborted()) {
1714
1715 throw new IOException("Aborting flush because server is abortted...");
1716 }
1717 final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1718
1719
1720 if (this.memstoreSize.get() <= 0) {
1721 if(LOG.isDebugEnabled()) {
1722 LOG.debug("Empty memstore size for the current region "+this);
1723 }
1724 return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
1725 }
1726
1727 LOG.info("Started memstore flush for " + this +
1728 ", current region memstore size " +
1729 StringUtils.humanReadableInt(this.memstoreSize.get()) +
1730 ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1731
1732
1733
1734
1735
1736
1737
1738
1739 MultiVersionConsistencyControl.WriteEntry w = null;
1740
1741
1742
1743
1744 status.setStatus("Obtaining lock to block concurrent updates");
1745
1746 this.updatesLock.writeLock().lock();
1747 long totalFlushableSize = 0;
1748 status.setStatus("Preparing to flush by snapshotting stores");
1749 List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1750 long flushSeqId = -1L;
1751 try {
1752
1753 w = mvcc.beginMemstoreInsert();
1754 mvcc.advanceMemstore(w);
1755
1756 if (wal != null) {
1757 if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1758 String msg = "Flush will not be started for ["
1759 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1760 status.setStatus(msg);
1761 return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1762 }
1763 flushSeqId = this.sequenceId.incrementAndGet();
1764 } else {
1765
1766 flushSeqId = myseqid;
1767 }
1768
1769 for (Store s : stores.values()) {
1770 totalFlushableSize += s.getFlushableSize();
1771 storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1772 }
1773
1774
1775 for (StoreFlushContext flush : storeFlushCtxs) {
1776 flush.prepare();
1777 }
1778 } finally {
1779 this.updatesLock.writeLock().unlock();
1780 }
1781 String s = "Finished memstore snapshotting " + this +
1782 ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1783 status.setStatus(s);
1784 if (LOG.isTraceEnabled()) LOG.trace(s);
1785
1786
1787
1788 if (wal != null && !shouldSyncLog()) {
1789 wal.sync();
1790 }
1791
1792
1793
1794
1795
1796
1797 mvcc.waitForRead(w);
1798
1799 s = "Flushing stores of " + this;
1800 status.setStatus(s);
1801 if (LOG.isTraceEnabled()) LOG.trace(s);
1802
1803
1804
1805
1806
1807 boolean compactionRequested = false;
1808 try {
1809
1810
1811
1812
1813
1814 for (StoreFlushContext flush : storeFlushCtxs) {
1815 flush.flushCache(status);
1816 }
1817
1818
1819
1820 for (StoreFlushContext flush : storeFlushCtxs) {
1821 boolean needsCompaction = flush.commit(status);
1822 if (needsCompaction) {
1823 compactionRequested = true;
1824 }
1825 }
1826 storeFlushCtxs.clear();
1827
1828
1829 this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1830 } catch (Throwable t) {
1831
1832
1833
1834
1835
1836
1837 if (wal != null) {
1838 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1839 }
1840 DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1841 Bytes.toStringBinary(getRegionName()));
1842 dse.initCause(t);
1843 status.abort("Flush failed: " + StringUtils.stringifyException(t));
1844 throw dse;
1845 }
1846
1847
1848 if (wal != null) {
1849 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1850 }
1851
1852
1853 this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
1854
1855
1856 completeSequenceId = flushSeqId;
1857
1858
1859
1860 synchronized (this) {
1861 notifyAll();
1862 }
1863
1864 long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1865 long memstoresize = this.memstoreSize.get();
1866 String msg = "Finished memstore flush of ~" +
1867 StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
1868 ", currentsize=" +
1869 StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
1870 " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1871 ", compaction requested=" + compactionRequested +
1872 ((wal == null)? "; wal=null": "");
1873 LOG.info(msg);
1874 status.setStatus(msg);
1875 this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
1876
1877 return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1878 FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
1879 }
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893 Result getClosestRowBefore(final byte [] row)
1894 throws IOException{
1895 return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1896 }
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908 public Result getClosestRowBefore(final byte [] row, final byte [] family)
1909 throws IOException {
1910 if (coprocessorHost != null) {
1911 Result result = new Result();
1912 if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
1913 return result;
1914 }
1915 }
1916
1917
1918 checkRow(row, "getClosestRowBefore");
1919 startRegionOperation(Operation.GET);
1920 this.readRequestsCount.increment();
1921 try {
1922 Store store = getStore(family);
1923
1924 KeyValue key = store.getRowKeyAtOrBefore(row);
1925 Result result = null;
1926 if (key != null) {
1927 Get get = new Get(key.getRow());
1928 get.addFamily(family);
1929 result = get(get);
1930 }
1931 if (coprocessorHost != null) {
1932 coprocessorHost.postGetClosestRowBefore(row, family, result);
1933 }
1934 return result;
1935 } finally {
1936 closeRegionOperation(Operation.GET);
1937 }
1938 }
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950 public RegionScanner getScanner(Scan scan) throws IOException {
1951 return getScanner(scan, null);
1952 }
1953
1954 void prepareScanner(Scan scan) throws IOException {
1955 if(!scan.hasFamilies()) {
1956
1957 for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
1958 scan.addFamily(family);
1959 }
1960 }
1961 }
1962
1963 protected RegionScanner getScanner(Scan scan,
1964 List<KeyValueScanner> additionalScanners) throws IOException {
1965 startRegionOperation(Operation.SCAN);
1966 try {
1967
1968 prepareScanner(scan);
1969 if(scan.hasFamilies()) {
1970 for(byte [] family : scan.getFamilyMap().keySet()) {
1971 checkFamily(family);
1972 }
1973 }
1974 return instantiateRegionScanner(scan, additionalScanners);
1975 } finally {
1976 closeRegionOperation(Operation.SCAN);
1977 }
1978 }
1979
1980 protected RegionScanner instantiateRegionScanner(Scan scan,
1981 List<KeyValueScanner> additionalScanners) throws IOException {
1982 if (scan.isReversed()) {
1983 if (scan.getFilter() != null) {
1984 scan.getFilter().setReversed(true);
1985 }
1986 return new ReversedRegionScannerImpl(scan, additionalScanners, this);
1987 }
1988 return new RegionScannerImpl(scan, additionalScanners, this);
1989 }
1990
1991
1992
1993
1994 void prepareDelete(Delete delete) throws IOException {
1995
1996 if(delete.getFamilyCellMap().isEmpty()){
1997 for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
1998
1999 delete.deleteFamily(family, delete.getTimeStamp());
2000 }
2001 } else {
2002 for(byte [] family : delete.getFamilyCellMap().keySet()) {
2003 if(family == null) {
2004 throw new NoSuchColumnFamilyException("Empty family is invalid");
2005 }
2006 checkFamily(family);
2007 }
2008 }
2009 }
2010
2011
2012
2013
2014
2015
2016
2017
2018 public void delete(Delete delete)
2019 throws IOException {
2020 checkReadOnly();
2021 checkResources();
2022 startRegionOperation(Operation.DELETE);
2023 try {
2024 delete.getRow();
2025
2026 doBatchMutate(delete);
2027 } finally {
2028 closeRegionOperation(Operation.DELETE);
2029 }
2030 }
2031
2032
2033
2034
2035 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2036
2037
2038
2039
2040
2041
2042 void delete(NavigableMap<byte[], List<Cell>> familyMap,
2043 Durability durability) throws IOException {
2044 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2045 delete.setFamilyCellMap(familyMap);
2046 delete.setDurability(durability);
2047 doBatchMutate(delete);
2048 }
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058 void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2059 byte[] byteNow) throws IOException {
2060 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2061
2062 byte[] family = e.getKey();
2063 List<Cell> cells = e.getValue();
2064 assert cells instanceof RandomAccess;
2065
2066 Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2067 int listSize = cells.size();
2068 for (int i=0; i < listSize; i++) {
2069 Cell cell = cells.get(i);
2070 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2071
2072
2073 if (kv.isLatestTimestamp() && kv.isDeleteType()) {
2074 byte[] qual = kv.getQualifier();
2075 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2076
2077 Integer count = kvCount.get(qual);
2078 if (count == null) {
2079 kvCount.put(qual, 1);
2080 } else {
2081 kvCount.put(qual, count + 1);
2082 }
2083 count = kvCount.get(qual);
2084
2085 Get get = new Get(kv.getRow());
2086 get.setMaxVersions(count);
2087 get.addColumn(family, qual);
2088 if (coprocessorHost != null) {
2089 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2090 byteNow, get)) {
2091 updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2092 }
2093 } else {
2094 updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2095 }
2096 } else {
2097 kv.updateLatestStamp(byteNow);
2098 }
2099 }
2100 }
2101 }
2102
2103 void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow)
2104 throws IOException {
2105 List<Cell> result = get(get, false);
2106
2107 if (result.size() < count) {
2108
2109 kv.updateLatestStamp(byteNow);
2110 return;
2111 }
2112 if (result.size() > count) {
2113 throw new RuntimeException("Unexpected size: " + result.size());
2114 }
2115 KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
2116 Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
2117 getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
2118 }
2119
2120
2121
2122
2123
2124 public void put(Put put)
2125 throws IOException {
2126 checkReadOnly();
2127
2128
2129
2130
2131
2132 checkResources();
2133 startRegionOperation(Operation.PUT);
2134 try {
2135
2136 doBatchMutate(put);
2137 } finally {
2138 closeRegionOperation(Operation.PUT);
2139 }
2140 }
2141
2142
2143
2144
2145
2146
2147 private abstract static class BatchOperationInProgress<T> {
2148 T[] operations;
2149 int nextIndexToProcess = 0;
2150 OperationStatus[] retCodeDetails;
2151 WALEdit[] walEditsFromCoprocessors;
2152
2153 public BatchOperationInProgress(T[] operations) {
2154 this.operations = operations;
2155 this.retCodeDetails = new OperationStatus[operations.length];
2156 this.walEditsFromCoprocessors = new WALEdit[operations.length];
2157 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2158 }
2159
2160 public abstract Mutation getMutation(int index);
2161 public abstract long getNonceGroup(int index);
2162 public abstract long getNonce(int index);
2163
2164 public abstract Mutation[] getMutationsForCoprocs();
2165 public abstract boolean isInReplay();
2166
2167 public boolean isDone() {
2168 return nextIndexToProcess == operations.length;
2169 }
2170 }
2171
2172 private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2173 private long nonceGroup;
2174 private long nonce;
2175 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2176 super(operations);
2177 this.nonceGroup = nonceGroup;
2178 this.nonce = nonce;
2179 }
2180
2181 public Mutation getMutation(int index) {
2182 return this.operations[index];
2183 }
2184
2185 @Override
2186 public long getNonceGroup(int index) {
2187 return nonceGroup;
2188 }
2189
2190 @Override
2191 public long getNonce(int index) {
2192 return nonce;
2193 }
2194
2195 @Override
2196 public Mutation[] getMutationsForCoprocs() {
2197 return this.operations;
2198 }
2199
2200 @Override
2201 public boolean isInReplay() {
2202 return false;
2203 }
2204 }
2205
2206 private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
2207 public ReplayBatch(MutationReplay[] operations) {
2208 super(operations);
2209 }
2210
2211 @Override
2212 public Mutation getMutation(int index) {
2213 return this.operations[index].mutation;
2214 }
2215
2216 @Override
2217 public long getNonceGroup(int index) {
2218 return this.operations[index].nonceGroup;
2219 }
2220
2221 @Override
2222 public long getNonce(int index) {
2223 return this.operations[index].nonce;
2224 }
2225
2226 @Override
2227 public Mutation[] getMutationsForCoprocs() {
2228 assert false;
2229 throw new RuntimeException("Should not be called for replay batch");
2230 }
2231
2232 @Override
2233 public boolean isInReplay() {
2234 return true;
2235 }
2236 }
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246 public OperationStatus[] batchMutate(
2247 Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2248
2249
2250
2251
2252 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2253 }
2254
2255 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2256 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2257 }
2258
2259
2260
2261
2262
2263
2264
2265
2266 public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
2267 throws IOException {
2268 return batchMutate(new ReplayBatch(mutations));
2269 }
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279 OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2280 boolean initialized = false;
2281 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
2282 startRegionOperation(op);
2283 try {
2284 while (!batchOp.isDone()) {
2285 if (!batchOp.isInReplay()) {
2286 checkReadOnly();
2287 }
2288 checkResources();
2289
2290 if (!initialized) {
2291 this.writeRequestsCount.add(batchOp.operations.length);
2292 if (!batchOp.isInReplay()) {
2293 doPreMutationHook(batchOp);
2294 }
2295 initialized = true;
2296 }
2297 long addedSize = doMiniBatchMutation(batchOp);
2298 long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2299 if (isFlushSize(newSize)) {
2300 requestFlush();
2301 }
2302 }
2303 } finally {
2304 closeRegionOperation(op);
2305 }
2306 return batchOp.retCodeDetails;
2307 }
2308
2309
2310 private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2311 throws IOException {
2312
2313 WALEdit walEdit = new WALEdit();
2314 if (coprocessorHost != null) {
2315 for (int i = 0 ; i < batchOp.operations.length; i++) {
2316 Mutation m = batchOp.getMutation(i);
2317 if (m instanceof Put) {
2318 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2319
2320
2321 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2322 }
2323 } else if (m instanceof Delete) {
2324 Delete curDel = (Delete) m;
2325 if (curDel.getFamilyCellMap().isEmpty()) {
2326
2327 prepareDelete(curDel);
2328 }
2329 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2330
2331
2332 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2333 }
2334 } else {
2335
2336
2337
2338 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2339 "Put/Delete mutations only supported in batchMutate() now");
2340 }
2341 if (!walEdit.isEmpty()) {
2342 batchOp.walEditsFromCoprocessors[i] = walEdit;
2343 walEdit = new WALEdit();
2344 }
2345 }
2346 }
2347 }
2348
2349 @SuppressWarnings("unchecked")
2350 private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2351 boolean isInReplay = batchOp.isInReplay();
2352
2353 boolean putsCfSetConsistent = true;
2354
2355 Set<byte[]> putsCfSet = null;
2356
2357 boolean deletesCfSetConsistent = true;
2358
2359 Set<byte[]> deletesCfSet = null;
2360
2361 long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2362 WALEdit walEdit = new WALEdit(isInReplay);
2363 MultiVersionConsistencyControl.WriteEntry w = null;
2364 long txid = 0;
2365 boolean doRollBackMemstore = false;
2366 boolean locked = false;
2367
2368
2369 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2370
2371 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2372
2373 int firstIndex = batchOp.nextIndexToProcess;
2374 int lastIndexExclusive = firstIndex;
2375 boolean success = false;
2376 int noOfPuts = 0, noOfDeletes = 0;
2377 try {
2378
2379
2380
2381
2382 int numReadyToWrite = 0;
2383 long now = EnvironmentEdgeManager.currentTimeMillis();
2384 while (lastIndexExclusive < batchOp.operations.length) {
2385 Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2386 boolean isPutMutation = mutation instanceof Put;
2387
2388 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2389
2390 familyMaps[lastIndexExclusive] = familyMap;
2391
2392
2393 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2394 != OperationStatusCode.NOT_RUN) {
2395 lastIndexExclusive++;
2396 continue;
2397 }
2398
2399 try {
2400 if (isPutMutation) {
2401
2402 if (isInReplay) {
2403 removeNonExistentColumnFamilyForReplay(familyMap);
2404 } else {
2405 checkFamilies(familyMap.keySet());
2406 }
2407 checkTimestamps(mutation.getFamilyCellMap(), now);
2408 } else {
2409 prepareDelete((Delete) mutation);
2410 }
2411 } catch (NoSuchColumnFamilyException nscf) {
2412 LOG.warn("No such column family in batch mutation", nscf);
2413 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2414 OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2415 lastIndexExclusive++;
2416 continue;
2417 } catch (FailedSanityCheckException fsce) {
2418 LOG.warn("Batch Mutation did not pass sanity check", fsce);
2419 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2420 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2421 lastIndexExclusive++;
2422 continue;
2423 }
2424
2425
2426
2427 boolean shouldBlock = numReadyToWrite == 0;
2428 RowLock rowLock = null;
2429 try {
2430 rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
2431 } catch (IOException ioe) {
2432 LOG.warn("Failed getting lock in batch put, row="
2433 + Bytes.toStringBinary(mutation.getRow()), ioe);
2434 }
2435 if (rowLock == null) {
2436
2437 assert !shouldBlock : "Should never fail to get lock when blocking";
2438 break;
2439 } else {
2440 acquiredRowLocks.add(rowLock);
2441 }
2442
2443 lastIndexExclusive++;
2444 numReadyToWrite++;
2445
2446 if (isPutMutation) {
2447
2448
2449
2450 if (putsCfSet == null) {
2451 putsCfSet = mutation.getFamilyCellMap().keySet();
2452 } else {
2453 putsCfSetConsistent = putsCfSetConsistent
2454 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2455 }
2456 } else {
2457 if (deletesCfSet == null) {
2458 deletesCfSet = mutation.getFamilyCellMap().keySet();
2459 } else {
2460 deletesCfSetConsistent = deletesCfSetConsistent
2461 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2462 }
2463 }
2464 }
2465
2466
2467
2468 now = EnvironmentEdgeManager.currentTimeMillis();
2469 byte[] byteNow = Bytes.toBytes(now);
2470
2471
2472 if (numReadyToWrite <= 0) return 0L;
2473
2474
2475
2476
2477
2478
2479 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2480
2481 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2482 != OperationStatusCode.NOT_RUN) continue;
2483
2484 Mutation mutation = batchOp.getMutation(i);
2485 if (mutation instanceof Put) {
2486 updateKVTimestamps(familyMaps[i].values(), byteNow);
2487 noOfPuts++;
2488 } else {
2489 if (!isInReplay) {
2490 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2491 }
2492 noOfDeletes++;
2493 }
2494 rewriteCellTags(familyMaps[i], mutation);
2495 }
2496
2497 lock(this.updatesLock.readLock(), numReadyToWrite);
2498 locked = true;
2499
2500
2501
2502
2503
2504 w = mvcc.beginMemstoreInsert();
2505
2506
2507 if (!isInReplay && coprocessorHost != null) {
2508 MiniBatchOperationInProgress<Mutation> miniBatchOp =
2509 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2510 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2511 if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2512 }
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523 long addedSize = 0;
2524 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2525 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2526 != OperationStatusCode.NOT_RUN) {
2527 continue;
2528 }
2529 doRollBackMemstore = true;
2530 addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
2531 }
2532
2533
2534
2535
2536 boolean hasWalAppends = false;
2537 Durability durability = Durability.USE_DEFAULT;
2538 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2539
2540 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2541 != OperationStatusCode.NOT_RUN) {
2542 continue;
2543 }
2544 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2545
2546 Mutation m = batchOp.getMutation(i);
2547 Durability tmpDur = getEffectiveDurability(m.getDurability());
2548 if (tmpDur.ordinal() > durability.ordinal()) {
2549 durability = tmpDur;
2550 }
2551 if (tmpDur == Durability.SKIP_WAL) {
2552 recordMutationWithoutWal(m.getFamilyCellMap());
2553 continue;
2554 }
2555
2556 long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2557
2558
2559
2560 if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2561 if (walEdit.size() > 0) {
2562 assert isInReplay;
2563 if (!isInReplay) {
2564 throw new IOException("Multiple nonces per batch and not in replay");
2565 }
2566
2567 txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(),
2568 walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true,
2569 currentNonceGroup, currentNonce);
2570 hasWalAppends = true;
2571 walEdit = new WALEdit(isInReplay);
2572 }
2573 currentNonceGroup = nonceGroup;
2574 currentNonce = nonce;
2575 }
2576
2577
2578 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2579 if (fromCP != null) {
2580 for (KeyValue kv : fromCP.getKeyValues()) {
2581 walEdit.add(kv);
2582 }
2583 }
2584 addFamilyMapToWALEdit(familyMaps[i], walEdit);
2585 }
2586
2587
2588
2589
2590 Mutation mutation = batchOp.getMutation(firstIndex);
2591 if (walEdit.size() > 0) {
2592 txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
2593 walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId,
2594 true, currentNonceGroup, currentNonce);
2595 hasWalAppends = true;
2596 }
2597
2598
2599
2600
2601 if (locked) {
2602 this.updatesLock.readLock().unlock();
2603 locked = false;
2604 }
2605 releaseRowLocks(acquiredRowLocks);
2606
2607
2608
2609
2610 if (hasWalAppends) {
2611 syncOrDefer(txid, durability);
2612 }
2613 doRollBackMemstore = false;
2614
2615 if (!isInReplay && coprocessorHost != null) {
2616 MiniBatchOperationInProgress<Mutation> miniBatchOp =
2617 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2618 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2619 coprocessorHost.postBatchMutate(miniBatchOp);
2620 }
2621
2622
2623
2624
2625 if (w != null) {
2626 mvcc.completeMemstoreInsert(w);
2627 w = null;
2628 }
2629
2630
2631
2632
2633
2634 if (!isInReplay && coprocessorHost != null) {
2635 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2636
2637 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2638 != OperationStatusCode.SUCCESS) {
2639 continue;
2640 }
2641 Mutation m = batchOp.getMutation(i);
2642 if (m instanceof Put) {
2643 coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2644 } else {
2645 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2646 }
2647 }
2648 }
2649
2650 success = true;
2651 return addedSize;
2652 } finally {
2653
2654
2655 if (doRollBackMemstore) {
2656 rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
2657 }
2658 if (w != null) mvcc.completeMemstoreInsert(w);
2659
2660 if (locked) {
2661 this.updatesLock.readLock().unlock();
2662 }
2663 releaseRowLocks(acquiredRowLocks);
2664
2665
2666
2667
2668
2669
2670
2671 if (noOfPuts > 0) {
2672
2673 if (this.metricsRegion != null) {
2674 this.metricsRegion.updatePut();
2675 }
2676 }
2677 if (noOfDeletes > 0) {
2678
2679 if (this.metricsRegion != null) {
2680 this.metricsRegion.updateDelete();
2681 }
2682 }
2683 if (!success) {
2684 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2685 if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2686 batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2687 }
2688 }
2689 }
2690 if (coprocessorHost != null && !batchOp.isInReplay()) {
2691
2692
2693 MiniBatchOperationInProgress<Mutation> miniBatchOp =
2694 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2695 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2696 lastIndexExclusive);
2697 coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2698 }
2699
2700 batchOp.nextIndexToProcess = lastIndexExclusive;
2701 }
2702 }
2703
2704
2705
2706
2707
2708 protected Durability getEffectiveDurability(Durability d) {
2709 return d == Durability.USE_DEFAULT ? this.durability : d;
2710 }
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2729 CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2730 boolean writeToWAL)
2731 throws IOException{
2732 checkReadOnly();
2733
2734
2735 checkResources();
2736 boolean isPut = w instanceof Put;
2737 if (!isPut && !(w instanceof Delete))
2738 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2739 "be Put or Delete");
2740 if (!Bytes.equals(row, w.getRow())) {
2741 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2742 "getRow must match the passed row");
2743 }
2744
2745 startRegionOperation();
2746 try {
2747 Get get = new Get(row);
2748 checkFamily(family);
2749 get.addColumn(family, qualifier);
2750
2751
2752 RowLock rowLock = getRowLock(get.getRow());
2753
2754 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
2755 try {
2756 if (this.getCoprocessorHost() != null) {
2757 Boolean processed = null;
2758 if (w instanceof Put) {
2759 processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
2760 qualifier, compareOp, comparator, (Put) w);
2761 } else if (w instanceof Delete) {
2762 processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
2763 qualifier, compareOp, comparator, (Delete) w);
2764 }
2765 if (processed != null) {
2766 return processed;
2767 }
2768 }
2769 List<Cell> result = get(get, false);
2770
2771 boolean valueIsNull = comparator.getValue() == null ||
2772 comparator.getValue().length == 0;
2773 boolean matches = false;
2774 if (result.size() == 0 && valueIsNull) {
2775 matches = true;
2776 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2777 valueIsNull) {
2778 matches = true;
2779 } else if (result.size() == 1 && !valueIsNull) {
2780 Cell kv = result.get(0);
2781 int compareResult = comparator.compareTo(kv.getValueArray(),
2782 kv.getValueOffset(), kv.getValueLength());
2783 switch (compareOp) {
2784 case LESS:
2785 matches = compareResult < 0;
2786 break;
2787 case LESS_OR_EQUAL:
2788 matches = compareResult <= 0;
2789 break;
2790 case EQUAL:
2791 matches = compareResult == 0;
2792 break;
2793 case NOT_EQUAL:
2794 matches = compareResult != 0;
2795 break;
2796 case GREATER_OR_EQUAL:
2797 matches = compareResult >= 0;
2798 break;
2799 case GREATER:
2800 matches = compareResult > 0;
2801 break;
2802 default:
2803 throw new RuntimeException("Unknown Compare op " + compareOp.name());
2804 }
2805 }
2806
2807 if (matches) {
2808
2809
2810 doBatchMutate((Mutation)w);
2811 this.checkAndMutateChecksPassed.increment();
2812 return true;
2813 }
2814 this.checkAndMutateChecksFailed.increment();
2815 return false;
2816 } finally {
2817 rowLock.release();
2818 }
2819 } finally {
2820 closeRegionOperation();
2821 }
2822 }
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833 public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
2834 CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
2835 boolean writeToWAL)
2836 throws IOException{
2837 checkReadOnly();
2838
2839
2840 checkResources();
2841
2842 startRegionOperation();
2843 try {
2844 Get get = new Get(row);
2845 checkFamily(family);
2846 get.addColumn(family, qualifier);
2847
2848
2849 RowLock rowLock = getRowLock(get.getRow());
2850
2851 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
2852 try {
2853 List<Cell> result = get(get, false);
2854
2855 boolean valueIsNull = comparator.getValue() == null ||
2856 comparator.getValue().length == 0;
2857 boolean matches = false;
2858 if (result.size() == 0 && valueIsNull) {
2859 matches = true;
2860 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2861 valueIsNull) {
2862 matches = true;
2863 } else if (result.size() == 1 && !valueIsNull) {
2864 Cell kv = result.get(0);
2865 int compareResult = comparator.compareTo(kv.getValueArray(),
2866 kv.getValueOffset(), kv.getValueLength());
2867 switch (compareOp) {
2868 case LESS:
2869 matches = compareResult < 0;
2870 break;
2871 case LESS_OR_EQUAL:
2872 matches = compareResult <= 0;
2873 break;
2874 case EQUAL:
2875 matches = compareResult == 0;
2876 break;
2877 case NOT_EQUAL:
2878 matches = compareResult != 0;
2879 break;
2880 case GREATER_OR_EQUAL:
2881 matches = compareResult >= 0;
2882 break;
2883 case GREATER:
2884 matches = compareResult > 0;
2885 break;
2886 default:
2887 throw new RuntimeException("Unknown Compare op " + compareOp.name());
2888 }
2889 }
2890
2891 if (matches) {
2892
2893
2894 mutateRow(rm);
2895 this.checkAndMutateChecksPassed.increment();
2896 return true;
2897 }
2898 this.checkAndMutateChecksFailed.increment();
2899 return false;
2900 } finally {
2901 rowLock.release();
2902 }
2903 } finally {
2904 closeRegionOperation();
2905 }
2906 }
2907
2908 private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
2909
2910 OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
2911 HConstants.NO_NONCE, HConstants.NO_NONCE);
2912 if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2913 throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2914 } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2915 throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2916 }
2917 }
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932 public void addRegionToSnapshot(SnapshotDescription desc,
2933 ForeignExceptionSnare exnSnare) throws IOException {
2934 Path rootDir = FSUtils.getRootDir(conf);
2935 Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2936
2937 SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
2938 snapshotDir, desc, exnSnare);
2939 manifest.addRegion(this);
2940 }
2941
2942
2943
2944
2945
2946 void updateKVTimestamps(final Iterable<List<Cell>> keyLists, final byte[] now) {
2947 for (List<Cell> cells: keyLists) {
2948 if (cells == null) continue;
2949 assert cells instanceof RandomAccess;
2950 int listSize = cells.size();
2951 for (int i=0; i < listSize; i++) {
2952 Cell cell = cells.get(i);
2953 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2954 kv.updateLatestStamp(now);
2955 }
2956 }
2957 }
2958
2959
2960
2961
2962 void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
2963
2964
2965
2966 if (m.getTTL() == Long.MAX_VALUE) {
2967 return;
2968 }
2969
2970
2971
2972 for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
2973 List<Cell> cells = e.getValue();
2974 assert cells instanceof RandomAccess;
2975 int listSize = cells.size();
2976 for (int i = 0; i < listSize; i++) {
2977 Cell cell = cells.get(i);
2978 List<Tag> newTags = new ArrayList<Tag>();
2979 Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
2980 cell.getTagsOffset(), cell.getTagsLengthUnsigned());
2981
2982
2983
2984 while (tagIterator.hasNext()) {
2985
2986
2987
2988 newTags.add(tagIterator.next());
2989 }
2990
2991
2992
2993
2994
2995 if (m.getTTL() != Long.MAX_VALUE) {
2996
2997 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
2998 }
2999
3000
3001
3002 cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
3003 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3004 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
3005 cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
3006 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
3007 newTags));
3008 }
3009 }
3010 }
3011
3012
3013
3014
3015
3016
3017
3018 private void checkResources()
3019 throws RegionTooBusyException {
3020
3021 if (this.getRegionInfo().isMetaRegion()) return;
3022
3023 if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3024 blockedRequestsCount.increment();
3025 requestFlush();
3026 throw new RegionTooBusyException("Above memstore limit, " +
3027 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3028 this.getRegionInfo().getRegionNameAsString()) +
3029 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3030 this.getRegionServerServices().getServerName()) +
3031 ", memstoreSize=" + memstoreSize.get() +
3032 ", blockingMemStoreSize=" + blockingMemStoreSize);
3033 }
3034 }
3035
3036
3037
3038
3039 protected void checkReadOnly() throws IOException {
3040 if (this.writestate.isReadOnly()) {
3041 throw new IOException("region is read only");
3042 }
3043 }
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053 private void put(final byte [] row, byte [] family, List<Cell> edits)
3054 throws IOException {
3055 NavigableMap<byte[], List<Cell>> familyMap;
3056 familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3057
3058 familyMap.put(family, edits);
3059 Put p = new Put(row);
3060 p.setFamilyCellMap(familyMap);
3061 doBatchMutate(p);
3062 }
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076 private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3077 MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
3078 long size = 0;
3079 boolean freemvcc = false;
3080
3081 try {
3082 if (localizedWriteEntry == null) {
3083 localizedWriteEntry = mvcc.beginMemstoreInsert();
3084 freemvcc = true;
3085 }
3086
3087 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3088 byte[] family = e.getKey();
3089 List<Cell> cells = e.getValue();
3090 assert cells instanceof RandomAccess;
3091 Store store = getStore(family);
3092 int listSize = cells.size();
3093 for (int i=0; i < listSize; i++) {
3094 Cell cell = cells.get(i);
3095 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3096 kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
3097 size += store.add(kv);
3098 }
3099 }
3100 } finally {
3101 if (freemvcc) {
3102 mvcc.completeMemstoreInsert(localizedWriteEntry);
3103 }
3104 }
3105
3106 return size;
3107 }
3108
3109
3110
3111
3112
3113
3114 private void rollbackMemstore(BatchOperationInProgress<?> batchOp,
3115 Map<byte[], List<Cell>>[] familyMaps,
3116 int start, int end) {
3117 int kvsRolledback = 0;
3118 for (int i = start; i < end; i++) {
3119
3120 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3121 != OperationStatusCode.SUCCESS) {
3122 continue;
3123 }
3124
3125
3126 Map<byte[], List<Cell>> familyMap = familyMaps[i];
3127 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3128 byte[] family = e.getKey();
3129 List<Cell> cells = e.getValue();
3130
3131
3132
3133
3134 Store store = getStore(family);
3135 for (Cell cell: cells) {
3136 store.rollback(KeyValueUtil.ensureKeyValue(cell));
3137 kvsRolledback++;
3138 }
3139 }
3140 }
3141 LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
3142 " keyvalues from start:" + start + " to end:" + end);
3143 }
3144
3145
3146
3147
3148
3149 void checkFamilies(Collection<byte[]> families)
3150 throws NoSuchColumnFamilyException {
3151 for (byte[] family : families) {
3152 checkFamily(family);
3153 }
3154 }
3155
3156
3157
3158
3159
3160 private void removeNonExistentColumnFamilyForReplay(
3161 final Map<byte[], List<Cell>> familyMap) {
3162 List<byte[]> nonExistentList = null;
3163 for (byte[] family : familyMap.keySet()) {
3164 if (!this.htableDescriptor.hasFamily(family)) {
3165 if (nonExistentList == null) {
3166 nonExistentList = new ArrayList<byte[]>();
3167 }
3168 nonExistentList.add(family);
3169 }
3170 }
3171 if (nonExistentList != null) {
3172 for (byte[] family : nonExistentList) {
3173
3174 LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3175 familyMap.remove(family);
3176 }
3177 }
3178 }
3179
3180 void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3181 long now) throws FailedSanityCheckException {
3182 if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3183 return;
3184 }
3185 long maxTs = now + timestampSlop;
3186 for (List<Cell> kvs : familyMap.values()) {
3187 assert kvs instanceof RandomAccess;
3188 int listSize = kvs.size();
3189 for (int i=0; i < listSize; i++) {
3190 Cell cell = kvs.get(i);
3191
3192 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3193 if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
3194 throw new FailedSanityCheckException("Timestamp for KV out of range "
3195 + cell + " (too.new=" + timestampSlop + ")");
3196 }
3197 }
3198 }
3199 }
3200
3201
3202
3203
3204
3205
3206
3207 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3208 WALEdit walEdit) {
3209 for (List<Cell> edits : familyMap.values()) {
3210 assert edits instanceof RandomAccess;
3211 int listSize = edits.size();
3212 for (int i=0; i < listSize; i++) {
3213 Cell cell = edits.get(i);
3214 walEdit.add(KeyValueUtil.ensureKeyValue(cell));
3215 }
3216 }
3217 }
3218
3219 private void requestFlush() {
3220 if (this.rsServices == null) {
3221 return;
3222 }
3223 synchronized (writestate) {
3224 if (this.writestate.isFlushRequested()) {
3225 return;
3226 }
3227 writestate.flushRequested = true;
3228 }
3229
3230 this.rsServices.getFlushRequester().requestFlush(this);
3231 if (LOG.isDebugEnabled()) {
3232 LOG.debug("Flush requested on " + this);
3233 }
3234 }
3235
3236
3237
3238
3239
3240 private boolean isFlushSize(final long size) {
3241 return size > this.memstoreFlushSize;
3242 }
3243
3244
3245
3246
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280 protected long replayRecoveredEditsIfAny(final Path regiondir,
3281 Map<byte[], Long> maxSeqIdInStores,
3282 final CancelableProgressable reporter, final MonitoredTask status)
3283 throws UnsupportedEncodingException, IOException {
3284 long minSeqIdForTheRegion = -1;
3285 for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3286 if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3287 minSeqIdForTheRegion = maxSeqIdInStore;
3288 }
3289 }
3290 long seqid = minSeqIdForTheRegion;
3291
3292 FileSystem fs = this.fs.getFileSystem();
3293 NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
3294 if (LOG.isDebugEnabled()) {
3295 LOG.debug("Found " + (files == null ? 0 : files.size())
3296 + " recovered edits file(s) under " + regiondir);
3297 }
3298
3299 if (files == null || files.isEmpty()) return seqid;
3300
3301 for (Path edits: files) {
3302 if (edits == null || !fs.exists(edits)) {
3303 LOG.warn("Null or non-existent edits file: " + edits);
3304 continue;
3305 }
3306 if (isZeroLengthThenDelete(fs, edits)) continue;
3307
3308 long maxSeqId;
3309 String fileName = edits.getName();
3310 maxSeqId = Math.abs(Long.parseLong(fileName));
3311 if (maxSeqId <= minSeqIdForTheRegion) {
3312 if (LOG.isDebugEnabled()) {
3313 String msg = "Maximum sequenceid for this log is " + maxSeqId
3314 + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3315 + ", skipped the whole file, path=" + edits;
3316 LOG.debug(msg);
3317 }
3318 continue;
3319 }
3320
3321 try {
3322
3323 seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3324 } catch (IOException e) {
3325 boolean skipErrors = conf.getBoolean(
3326 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3327 conf.getBoolean(
3328 "hbase.skip.errors",
3329 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3330 if (conf.get("hbase.skip.errors") != null) {
3331 LOG.warn(
3332 "The property 'hbase.skip.errors' has been deprecated. Please use " +
3333 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3334 }
3335 if (skipErrors) {
3336 Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3337 LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3338 + "=true so continuing. Renamed " + edits +
3339 " as " + p, e);
3340 } else {
3341 throw e;
3342 }
3343 }
3344 }
3345
3346
3347 if (this.rsAccounting != null) {
3348 this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3349 }
3350 if (seqid > minSeqIdForTheRegion) {
3351
3352 internalFlushcache(null, seqid, status);
3353 }
3354
3355 for (Path file: files) {
3356 if (!fs.delete(file, false)) {
3357 LOG.error("Failed delete of " + file);
3358 } else {
3359 LOG.debug("Deleted recovered.edits file=" + file);
3360 }
3361 }
3362 return seqid;
3363 }
3364
3365
3366
3367
3368
3369
3370
3371
3372
3373
3374 private long replayRecoveredEdits(final Path edits,
3375 Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3376 throws IOException {
3377 String msg = "Replaying edits from " + edits;
3378 LOG.info(msg);
3379 MonitoredTask status = TaskMonitor.get().createStatus(msg);
3380 FileSystem fs = this.fs.getFileSystem();
3381
3382 status.setStatus("Opening logs");
3383 HLog.Reader reader = null;
3384 try {
3385 reader = HLogFactory.createReader(fs, edits, conf);
3386 long currentEditSeqId = -1;
3387 long firstSeqIdInLog = -1;
3388 long skippedEdits = 0;
3389 long editsCount = 0;
3390 long intervalEdits = 0;
3391 HLog.Entry entry;
3392 Store store = null;
3393 boolean reported_once = false;
3394 ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3395
3396 try {
3397
3398 int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3399 2000);
3400
3401 int period = this.conf.getInt("hbase.hstore.report.period",
3402 this.conf.getInt(AssignmentManager.ASSIGNMENT_TIMEOUT,
3403 AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT) / 2);
3404 long lastReport = EnvironmentEdgeManager.currentTimeMillis();
3405
3406 while ((entry = reader.next()) != null) {
3407 HLogKey key = entry.getKey();
3408 WALEdit val = entry.getEdit();
3409
3410 if (ng != null) {
3411 ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3412 }
3413
3414 if (reporter != null) {
3415 intervalEdits += val.size();
3416 if (intervalEdits >= interval) {
3417
3418 intervalEdits = 0;
3419 long cur = EnvironmentEdgeManager.currentTimeMillis();
3420 if (lastReport + period <= cur) {
3421 status.setStatus("Replaying edits..." +
3422 " skipped=" + skippedEdits +
3423 " edits=" + editsCount);
3424
3425 if(!reporter.progress()) {
3426 msg = "Progressable reporter failed, stopping replay";
3427 LOG.warn(msg);
3428 status.abort(msg);
3429 throw new IOException(msg);
3430 }
3431 reported_once = true;
3432 lastReport = cur;
3433 }
3434 }
3435 }
3436
3437 if (firstSeqIdInLog == -1) {
3438 firstSeqIdInLog = key.getLogSeqNum();
3439 }
3440 currentEditSeqId = key.getLogSeqNum();
3441
3442
3443
3444 if (coprocessorHost != null) {
3445 status.setStatus("Running pre-WAL-restore hook in coprocessors");
3446 if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3447
3448 continue;
3449 }
3450 }
3451
3452 boolean flush = false;
3453 for (KeyValue kv: val.getKeyValues()) {
3454
3455
3456 if (kv.matchingFamily(WALEdit.METAFAMILY) ||
3457 !Bytes.equals(key.getEncodedRegionName(),
3458 this.getRegionInfo().getEncodedNameAsBytes())) {
3459
3460 CompactionDescriptor compaction = WALEdit.getCompaction(kv);
3461 if (compaction != null) {
3462
3463 completeCompactionMarker(compaction);
3464 }
3465
3466 skippedEdits++;
3467 continue;
3468 }
3469
3470 if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
3471 store = this.stores.get(kv.getFamily());
3472 }
3473 if (store == null) {
3474
3475
3476 LOG.warn("No family for " + kv);
3477 skippedEdits++;
3478 continue;
3479 }
3480
3481 if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3482 .getName())) {
3483 skippedEdits++;
3484 continue;
3485 }
3486
3487
3488
3489 flush |= restoreEdit(store, kv);
3490 editsCount++;
3491 }
3492 if (flush) {
3493 internalFlushcache(null, currentEditSeqId, status);
3494 }
3495
3496 if (coprocessorHost != null) {
3497 coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3498 }
3499 }
3500 } catch (EOFException eof) {
3501 Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3502 msg = "Encountered EOF. Most likely due to Master failure during " +
3503 "log spliting, so we have this data in another edit. " +
3504 "Continuing, but renaming " + edits + " as " + p;
3505 LOG.warn(msg, eof);
3506 status.abort(msg);
3507 } catch (IOException ioe) {
3508
3509
3510 if (ioe.getCause() instanceof ParseException) {
3511 Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3512 msg = "File corruption encountered! " +
3513 "Continuing, but renaming " + edits + " as " + p;
3514 LOG.warn(msg, ioe);
3515 status.setStatus(msg);
3516 } else {
3517 status.abort(StringUtils.stringifyException(ioe));
3518
3519
3520 throw ioe;
3521 }
3522 }
3523 if (reporter != null && !reported_once) {
3524 reporter.progress();
3525 }
3526 msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3527 ", firstSequenceidInLog=" + firstSeqIdInLog +
3528 ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
3529 status.markComplete(msg);
3530 LOG.debug(msg);
3531 return currentEditSeqId;
3532 } finally {
3533 status.cleanup();
3534 if (reader != null) {
3535 reader.close();
3536 }
3537 }
3538 }
3539
3540
3541
3542
3543
3544
3545
3546 void completeCompactionMarker(CompactionDescriptor compaction)
3547 throws IOException {
3548 Store store = this.getStore(compaction.getFamilyName().toByteArray());
3549 if (store == null) {
3550 LOG.warn("Found Compaction WAL edit for deleted family:" +
3551 Bytes.toString(compaction.getFamilyName().toByteArray()));
3552 return;
3553 }
3554 store.completeCompactionMarker(compaction);
3555 }
3556
3557
3558
3559
3560
3561
3562
3563 protected boolean restoreEdit(final Store s, final KeyValue kv) {
3564 long kvSize = s.add(kv);
3565 if (this.rsAccounting != null) {
3566 rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3567 }
3568 return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3569 }
3570
3571
3572
3573
3574
3575
3576
3577 private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3578 throws IOException {
3579 FileStatus stat = fs.getFileStatus(p);
3580 if (stat.getLen() > 0) return false;
3581 LOG.warn("File " + p + " is zero-length, deleting.");
3582 fs.delete(p, false);
3583 return true;
3584 }
3585
3586 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3587 return new HStore(this, family, this.conf);
3588 }
3589
3590
3591
3592
3593
3594
3595
3596
3597 public Store getStore(final byte[] column) {
3598 return this.stores.get(column);
3599 }
3600
3601 public Map<byte[], Store> getStores() {
3602 return this.stores;
3603 }
3604
3605
3606
3607
3608
3609
3610
3611
3612 public List<String> getStoreFileList(final byte [][] columns)
3613 throws IllegalArgumentException {
3614 List<String> storeFileNames = new ArrayList<String>();
3615 synchronized(closeLock) {
3616 for(byte[] column : columns) {
3617 Store store = this.stores.get(column);
3618 if (store == null) {
3619 throw new IllegalArgumentException("No column family : " +
3620 new String(column) + " available");
3621 }
3622 for (StoreFile storeFile: store.getStorefiles()) {
3623 storeFileNames.add(storeFile.getPath().toString());
3624 }
3625 }
3626 }
3627 return storeFileNames;
3628 }
3629
3630
3631
3632
3633
3634 void checkRow(final byte [] row, String op) throws IOException {
3635 if (!rowIsInRange(getRegionInfo(), row)) {
3636 throw new WrongRegionException("Requested row out of range for " +
3637 op + " on HRegion " + this + ", startKey='" +
3638 Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3639 Bytes.toStringBinary(getEndKey()) + "', row='" +
3640 Bytes.toStringBinary(row) + "'");
3641 }
3642 }
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653 public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3654 startRegionOperation();
3655 try {
3656 return getRowLockInternal(row, waitForLock);
3657 } finally {
3658 closeRegionOperation();
3659 }
3660 }
3661
3662
3663
3664
3665
3666 protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
3667 checkRow(row, "row lock");
3668 HashedBytes rowKey = new HashedBytes(row);
3669 RowLockContext rowLockContext = new RowLockContext(rowKey);
3670
3671
3672 while (true) {
3673 RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3674 if (existingContext == null) {
3675
3676 break;
3677 } else if (existingContext.ownedByCurrentThread()) {
3678
3679 rowLockContext = existingContext;
3680 break;
3681 } else {
3682
3683 if (!waitForLock) {
3684 return null;
3685 }
3686 TraceScope traceScope = null;
3687 try {
3688 if (Trace.isTracing()) {
3689 traceScope = Trace.startSpan("HRegion.getRowLockInternal");
3690 }
3691 if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3692 if(traceScope != null) {
3693 traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
3694 }
3695 throw new IOException("Timed out waiting for lock for row: " + rowKey);
3696 }
3697 if (traceScope != null) traceScope.close();
3698 traceScope = null;
3699 } catch (InterruptedException ie) {
3700 LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3701 InterruptedIOException iie = new InterruptedIOException();
3702 iie.initCause(ie);
3703 throw iie;
3704 } finally {
3705 if (traceScope != null) traceScope.close();
3706 }
3707 }
3708 }
3709
3710
3711 return rowLockContext.newLock();
3712 }
3713
3714
3715
3716
3717
3718
3719
3720 public RowLock getRowLock(byte[] row) throws IOException {
3721 return getRowLock(row, true);
3722 }
3723
3724
3725
3726
3727 public void releaseRowLocks(List<RowLock> rowLocks) {
3728 if (rowLocks != null) {
3729 for (RowLock rowLock : rowLocks) {
3730 rowLock.release();
3731 }
3732 rowLocks.clear();
3733 }
3734 }
3735
3736
3737
3738
3739
3740
3741
3742 private static boolean hasMultipleColumnFamilies(
3743 List<Pair<byte[], String>> familyPaths) {
3744 boolean multipleFamilies = false;
3745 byte[] family = null;
3746 for (Pair<byte[], String> pair : familyPaths) {
3747 byte[] fam = pair.getFirst();
3748 if (family == null) {
3749 family = fam;
3750 } else if (!Bytes.equals(family, fam)) {
3751 multipleFamilies = true;
3752 break;
3753 }
3754 }
3755 return multipleFamilies;
3756 }
3757
3758
3759 public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3760 boolean assignSeqId) throws IOException {
3761 return bulkLoadHFiles(familyPaths, assignSeqId, null);
3762 }
3763
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774
3775 public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3776 BulkLoadListener bulkLoadListener) throws IOException {
3777 Preconditions.checkNotNull(familyPaths);
3778
3779 startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3780 try {
3781 this.writeRequestsCount.increment();
3782
3783
3784
3785
3786 List<IOException> ioes = new ArrayList<IOException>();
3787 List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3788 for (Pair<byte[], String> p : familyPaths) {
3789 byte[] familyName = p.getFirst();
3790 String path = p.getSecond();
3791
3792 Store store = getStore(familyName);
3793 if (store == null) {
3794 IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3795 "No such column family " + Bytes.toStringBinary(familyName));
3796 ioes.add(ioe);
3797 } else {
3798 try {
3799 store.assertBulkLoadHFileOk(new Path(path));
3800 } catch (WrongRegionException wre) {
3801
3802 failures.add(p);
3803 } catch (IOException ioe) {
3804
3805 ioes.add(ioe);
3806 }
3807 }
3808 }
3809
3810
3811 if (ioes.size() != 0) {
3812 IOException e = MultipleIOException.createIOException(ioes);
3813 LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3814 throw e;
3815 }
3816
3817
3818 if (failures.size() != 0) {
3819 StringBuilder list = new StringBuilder();
3820 for (Pair<byte[], String> p : failures) {
3821 list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3822 .append(p.getSecond());
3823 }
3824
3825 LOG.warn("There was a recoverable bulk load failure likely due to a" +
3826 " split. These (family, HFile) pairs were not loaded: " + list);
3827 return false;
3828 }
3829
3830 long seqId = -1;
3831
3832
3833
3834 if (assignSeqId) {
3835 FlushResult fs = this.flushcache();
3836 if (fs.isFlushSucceeded()) {
3837 seqId = fs.flushSequenceId;
3838 } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3839 seqId = this.sequenceId.incrementAndGet();
3840 } else {
3841 throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3842 "flush didn't run. Reason for not flushing: " + fs.failureReason);
3843 }
3844 }
3845
3846 for (Pair<byte[], String> p : familyPaths) {
3847 byte[] familyName = p.getFirst();
3848 String path = p.getSecond();
3849 Store store = getStore(familyName);
3850 try {
3851 String finalPath = path;
3852 if(bulkLoadListener != null) {
3853 finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3854 }
3855 store.bulkLoadHFile(finalPath, seqId);
3856 if(bulkLoadListener != null) {
3857 bulkLoadListener.doneBulkLoad(familyName, path);
3858 }
3859 } catch (IOException ioe) {
3860
3861
3862
3863
3864 LOG.error("There was a partial failure due to IO when attempting to" +
3865 " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3866 if(bulkLoadListener != null) {
3867 try {
3868 bulkLoadListener.failedBulkLoad(familyName, path);
3869 } catch (Exception ex) {
3870 LOG.error("Error while calling failedBulkLoad for family "+
3871 Bytes.toString(familyName)+" with path "+path, ex);
3872 }
3873 }
3874 throw ioe;
3875 }
3876 }
3877 return true;
3878 } finally {
3879 closeBulkRegionOperation();
3880 }
3881 }
3882
3883 @Override
3884 public boolean equals(Object o) {
3885 return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3886 ((HRegion) o).getRegionName());
3887 }
3888
3889 @Override
3890 public int hashCode() {
3891 return Bytes.hashCode(this.getRegionName());
3892 }
3893
3894 @Override
3895 public String toString() {
3896 return this.getRegionNameAsString();
3897 }
3898
3899
3900
3901
3902 class RegionScannerImpl implements RegionScanner {
3903
3904 KeyValueHeap storeHeap = null;
3905
3906
3907 KeyValueHeap joinedHeap = null;
3908
3909
3910
3911 protected KeyValue joinedContinuationRow = null;
3912
3913 private final KeyValue KV_LIMIT = new KeyValue();
3914 protected final byte[] stopRow;
3915 private final FilterWrapper filter;
3916 private int batch;
3917 protected int isScan;
3918 private boolean filterClosed = false;
3919 private long readPt;
3920 private long maxResultSize;
3921 protected HRegion region;
3922
3923 @Override
3924 public HRegionInfo getRegionInfo() {
3925 return region.getRegionInfo();
3926 }
3927
3928 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3929 throws IOException {
3930
3931 this.region = region;
3932 this.maxResultSize = scan.getMaxResultSize();
3933 if (scan.hasFilter()) {
3934 this.filter = new FilterWrapper(scan.getFilter());
3935 } else {
3936 this.filter = null;
3937 }
3938
3939 this.batch = scan.getBatch();
3940 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3941 this.stopRow = null;
3942 } else {
3943 this.stopRow = scan.getStopRow();
3944 }
3945
3946
3947 this.isScan = scan.isGetScan() ? -1 : 0;
3948
3949
3950
3951 IsolationLevel isolationLevel = scan.getIsolationLevel();
3952 synchronized(scannerReadPoints) {
3953 this.readPt = getReadpoint(isolationLevel);
3954 scannerReadPoints.put(this, this.readPt);
3955 }
3956
3957
3958
3959 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3960 List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3961 if (additionalScanners != null) {
3962 scanners.addAll(additionalScanners);
3963 }
3964
3965 for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3966 scan.getFamilyMap().entrySet()) {
3967 Store store = stores.get(entry.getKey());
3968 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
3969 if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3970 || this.filter.isFamilyEssential(entry.getKey())) {
3971 scanners.add(scanner);
3972 } else {
3973 joinedScanners.add(scanner);
3974 }
3975 }
3976 initializeKVHeap(scanners, joinedScanners, region);
3977 }
3978
3979 RegionScannerImpl(Scan scan, HRegion region) throws IOException {
3980 this(scan, null, region);
3981 }
3982
3983 protected void initializeKVHeap(List<KeyValueScanner> scanners,
3984 List<KeyValueScanner> joinedScanners, HRegion region)
3985 throws IOException {
3986 this.storeHeap = new KeyValueHeap(scanners, region.comparator);
3987 if (!joinedScanners.isEmpty()) {
3988 this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
3989 }
3990 }
3991
3992 @Override
3993 public long getMaxResultSize() {
3994 return maxResultSize;
3995 }
3996
3997 @Override
3998 public long getMvccReadPoint() {
3999 return this.readPt;
4000 }
4001
4002
4003
4004
4005
4006
4007 protected void resetFilters() throws IOException {
4008 if (filter != null) {
4009 filter.reset();
4010 }
4011 }
4012
4013 @Override
4014 public boolean next(List<Cell> outResults)
4015 throws IOException {
4016
4017 return next(outResults, batch);
4018 }
4019
4020 @Override
4021 public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
4022 if (this.filterClosed) {
4023 throw new UnknownScannerException("Scanner was closed (timed out?) " +
4024 "after we renewed it. Could be caused by a very slow scanner " +
4025 "or a lengthy garbage collection");
4026 }
4027 startRegionOperation(Operation.SCAN);
4028 readRequestsCount.increment();
4029 try {
4030 boolean returnResult = nextRaw(outResults, limit);
4031 if (region != null && region.metricsRegion != null) {
4032 long totalSize = 0;
4033 for (Cell cell: outResults) {
4034 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
4035 totalSize += kv.getLength();
4036 }
4037 region.metricsRegion.updateScanNext(totalSize);
4038 }
4039 return returnResult;
4040 } finally {
4041 closeRegionOperation(Operation.SCAN);
4042 }
4043 }
4044
4045 @Override
4046 public boolean nextRaw(List<Cell> outResults)
4047 throws IOException {
4048 return nextRaw(outResults, batch);
4049 }
4050
4051 @Override
4052 public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
4053 if (storeHeap == null) {
4054
4055 throw new UnknownScannerException("Scanner was closed");
4056 }
4057 boolean returnResult;
4058 if (outResults.isEmpty()) {
4059
4060
4061 returnResult = nextInternal(outResults, limit);
4062 } else {
4063 List<Cell> tmpList = new ArrayList<Cell>();
4064 returnResult = nextInternal(tmpList, limit);
4065 outResults.addAll(tmpList);
4066 }
4067 resetFilters();
4068 if (isFilterDoneInternal()) {
4069 returnResult = false;
4070 }
4071 return returnResult;
4072 }
4073
4074 private void populateFromJoinedHeap(List<Cell> results, int limit)
4075 throws IOException {
4076 assert joinedContinuationRow != null;
4077 KeyValue kv = populateResult(results, this.joinedHeap, limit,
4078 joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(),
4079 joinedContinuationRow.getRowLength());
4080 if (kv != KV_LIMIT) {
4081
4082 joinedContinuationRow = null;
4083 }
4084
4085
4086 Collections.sort(results, comparator);
4087 }
4088
4089
4090
4091
4092
4093
4094
4095
4096
4097
4098
4099 private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
4100 byte[] currentRow, int offset, short length) throws IOException {
4101 KeyValue nextKv;
4102 do {
4103 heap.next(results, limit - results.size());
4104 if (limit > 0 && results.size() == limit) {
4105 return KV_LIMIT;
4106 }
4107 nextKv = heap.peek();
4108 } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
4109
4110 return nextKv;
4111 }
4112
4113
4114
4115
4116 @Override
4117 public synchronized boolean isFilterDone() throws IOException {
4118 return isFilterDoneInternal();
4119 }
4120
4121 private boolean isFilterDoneInternal() throws IOException {
4122 return this.filter != null && this.filter.filterAllRemaining();
4123 }
4124
4125 private boolean nextInternal(List<Cell> results, int limit)
4126 throws IOException {
4127 if (!results.isEmpty()) {
4128 throw new IllegalArgumentException("First parameter should be an empty list");
4129 }
4130 RpcCallContext rpcCall = RpcServer.getCurrentCall();
4131
4132
4133
4134
4135
4136 while (true) {
4137 if (rpcCall != null) {
4138
4139
4140
4141
4142 long afterTime = rpcCall.disconnectSince();
4143 if (afterTime >= 0) {
4144 throw new CallerDisconnectedException(
4145 "Aborting on region " + getRegionNameAsString() + ", call " +
4146 this + " after " + afterTime + " ms, since " +
4147 "caller disconnected");
4148 }
4149 }
4150
4151
4152 KeyValue current = this.storeHeap.peek();
4153
4154 byte[] currentRow = null;
4155 int offset = 0;
4156 short length = 0;
4157 if (current != null) {
4158 currentRow = current.getBuffer();
4159 offset = current.getRowOffset();
4160 length = current.getRowLength();
4161 }
4162 boolean stopRow = isStopRow(currentRow, offset, length);
4163
4164
4165 if (joinedContinuationRow == null) {
4166
4167 if (stopRow) {
4168 if (filter != null && filter.hasFilterRow()) {
4169 filter.filterRowCells(results);
4170 }
4171 return false;
4172 }
4173
4174
4175
4176 if (filterRowKey(currentRow, offset, length)) {
4177 boolean moreRows = nextRow(currentRow, offset, length);
4178 if (!moreRows) return false;
4179 results.clear();
4180 continue;
4181 }
4182
4183 KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4184 length);
4185
4186 if (nextKv == KV_LIMIT) {
4187 if (this.filter != null && filter.hasFilterRow()) {
4188 throw new IncompatibleFilterException(
4189 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4190 }
4191 return true;
4192 }
4193
4194 stopRow = nextKv == null ||
4195 isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
4196
4197 final boolean isEmptyRow = results.isEmpty();
4198
4199
4200
4201 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4202 if (filter != null && filter.hasFilterRow()) {
4203 ret = filter.filterRowCellsWithRet(results);
4204 }
4205
4206 if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4207 results.clear();
4208 boolean moreRows = nextRow(currentRow, offset, length);
4209 if (!moreRows) return false;
4210
4211
4212
4213 if (!stopRow) continue;
4214 return false;
4215 }
4216
4217
4218
4219
4220
4221 if (this.joinedHeap != null) {
4222 KeyValue nextJoinedKv = joinedHeap.peek();
4223
4224 boolean mayHaveData =
4225 (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
4226 || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
4227 true, true)
4228 && joinedHeap.peek() != null
4229 && joinedHeap.peek().matchingRow(currentRow, offset, length));
4230 if (mayHaveData) {
4231 joinedContinuationRow = current;
4232 populateFromJoinedHeap(results, limit);
4233 }
4234 }
4235 } else {
4236
4237 populateFromJoinedHeap(results, limit);
4238 }
4239
4240
4241
4242 if (joinedContinuationRow != null) {
4243 return true;
4244 }
4245
4246
4247
4248
4249 if (results.isEmpty()) {
4250 boolean moreRows = nextRow(currentRow, offset, length);
4251 if (!moreRows) return false;
4252 if (!stopRow) continue;
4253 }
4254
4255
4256 return !stopRow;
4257 }
4258 }
4259
4260
4261
4262
4263
4264
4265
4266
4267 private boolean filterRow() throws IOException {
4268
4269
4270 return filter != null && (!filter.hasFilterRow())
4271 && filter.filterRow();
4272 }
4273
4274 private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4275 return filter != null
4276 && filter.filterRowKey(row, offset, length);
4277 }
4278
4279 protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4280 assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4281 KeyValue next;
4282 while ((next = this.storeHeap.peek()) != null &&
4283 next.matchingRow(currentRow, offset, length)) {
4284 this.storeHeap.next(MOCKED_LIST);
4285 }
4286 resetFilters();
4287
4288 return this.region.getCoprocessorHost() == null
4289 || this.region.getCoprocessorHost()
4290 .postScannerFilterRow(this, currentRow, offset, length);
4291 }
4292
4293 protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4294 return currentRow == null ||
4295 (stopRow != null &&
4296 comparator.compareRows(stopRow, 0, stopRow.length,
4297 currentRow, offset, length) <= isScan);
4298 }
4299
4300 @Override
4301 public synchronized void close() {
4302 if (storeHeap != null) {
4303 storeHeap.close();
4304 storeHeap = null;
4305 }
4306 if (joinedHeap != null) {
4307 joinedHeap.close();
4308 joinedHeap = null;
4309 }
4310
4311 scannerReadPoints.remove(this);
4312 this.filterClosed = true;
4313 }
4314
4315 KeyValueHeap getStoreHeapForTesting() {
4316 return storeHeap;
4317 }
4318
4319 @Override
4320 public synchronized boolean reseek(byte[] row) throws IOException {
4321 if (row == null) {
4322 throw new IllegalArgumentException("Row cannot be null.");
4323 }
4324 boolean result = false;
4325 startRegionOperation();
4326 try {
4327 KeyValue kv = KeyValue.createFirstOnRow(row);
4328
4329 result = this.storeHeap.requestSeek(kv, true, true);
4330 if (this.joinedHeap != null) {
4331 result = this.joinedHeap.requestSeek(kv, true, true) || result;
4332 }
4333 } finally {
4334 closeRegionOperation();
4335 }
4336 return result;
4337 }
4338 }
4339
4340
4341
4342
4343
4344
4345
4346
4347
4348
4349
4350
4351
4352
4353
4354
4355
4356
4357
4358
4359
4360
4361 static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4362 Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4363 RegionServerServices rsServices) {
4364 try {
4365 @SuppressWarnings("unchecked")
4366 Class<? extends HRegion> regionClass =
4367 (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4368
4369 Constructor<? extends HRegion> c =
4370 regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4371 Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4372 RegionServerServices.class);
4373
4374 return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4375 } catch (Throwable e) {
4376
4377 throw new IllegalStateException("Could not instantiate a region instance.", e);
4378 }
4379 }
4380
4381
4382
4383
4384
4385
4386
4387
4388
4389
4390
4391
4392
4393
4394
4395
4396
4397
4398
4399 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4400 final Configuration conf, final HTableDescriptor hTableDescriptor)
4401 throws IOException {
4402 return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4403 }
4404
4405
4406
4407
4408
4409
4410
4411
4412
4413
4414
4415
4416 public static void closeHRegion(final HRegion r) throws IOException {
4417 if (r == null) return;
4418 r.close();
4419 if (r.getLog() == null) return;
4420 r.getLog().closeAndDelete();
4421 }
4422
4423
4424
4425
4426
4427
4428
4429
4430
4431
4432
4433
4434
4435
4436
4437
4438 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4439 final Configuration conf,
4440 final HTableDescriptor hTableDescriptor,
4441 final HLog hlog,
4442 final boolean initialize)
4443 throws IOException {
4444 return createHRegion(info, rootDir, conf, hTableDescriptor,
4445 hlog, initialize, false);
4446 }
4447
4448
4449
4450
4451
4452
4453
4454
4455
4456
4457
4458
4459
4460
4461
4462
4463
4464 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4465 final Configuration conf,
4466 final HTableDescriptor hTableDescriptor,
4467 final HLog hlog,
4468 final boolean initialize, final boolean ignoreHLog)
4469 throws IOException {
4470 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4471 return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
4472 }
4473
4474
4475
4476
4477
4478
4479
4480
4481
4482
4483
4484
4485
4486
4487
4488
4489
4490
4491 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4492 final Configuration conf,
4493 final HTableDescriptor hTableDescriptor,
4494 final HLog hlog,
4495 final boolean initialize, final boolean ignoreHLog)
4496 throws IOException {
4497 LOG.info("creating HRegion " + info.getTable().getNameAsString()
4498 + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4499 " Table name == " + info.getTable().getNameAsString());
4500 FileSystem fs = FileSystem.get(conf);
4501 HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4502 HLog effectiveHLog = hlog;
4503 if (hlog == null && !ignoreHLog) {
4504 effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
4505 HConstants.HREGION_LOGDIR_NAME, conf);
4506 }
4507 HRegion region = HRegion.newHRegion(tableDir,
4508 effectiveHLog, fs, conf, info, hTableDescriptor, null);
4509 if (initialize) {
4510
4511
4512 region.setSequenceId(region.initialize());
4513 }
4514 return region;
4515 }
4516
4517 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4518 final Configuration conf,
4519 final HTableDescriptor hTableDescriptor,
4520 final HLog hlog)
4521 throws IOException {
4522 return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4523 }
4524
4525
4526
4527
4528
4529
4530
4531
4532
4533
4534
4535
4536
4537
4538 public static HRegion openHRegion(final HRegionInfo info,
4539 final HTableDescriptor htd, final HLog wal,
4540 final Configuration conf)
4541 throws IOException {
4542 return openHRegion(info, htd, wal, conf, null, null);
4543 }
4544
4545
4546
4547
4548
4549
4550
4551
4552
4553
4554
4555
4556
4557
4558
4559
4560 public static HRegion openHRegion(final HRegionInfo info,
4561 final HTableDescriptor htd, final HLog wal, final Configuration conf,
4562 final RegionServerServices rsServices,
4563 final CancelableProgressable reporter)
4564 throws IOException {
4565 return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4566 }
4567
4568
4569
4570
4571
4572
4573
4574
4575
4576
4577
4578
4579
4580
4581 public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4582 final HTableDescriptor htd, final HLog wal, final Configuration conf)
4583 throws IOException {
4584 return openHRegion(rootDir, info, htd, wal, conf, null, null);
4585 }
4586
4587
4588
4589
4590
4591
4592
4593
4594
4595
4596
4597
4598
4599
4600
4601
4602 public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4603 final HTableDescriptor htd, final HLog wal, final Configuration conf,
4604 final RegionServerServices rsServices,
4605 final CancelableProgressable reporter)
4606 throws IOException {
4607 FileSystem fs = null;
4608 if (rsServices != null) {
4609 fs = rsServices.getFileSystem();
4610 }
4611 if (fs == null) {
4612 fs = FileSystem.get(conf);
4613 }
4614 return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4615 }
4616
4617
4618
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
4629
4630
4631 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4632 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4633 throws IOException {
4634 return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4635 }
4636
4637
4638
4639
4640
4641
4642
4643
4644
4645
4646
4647
4648
4649
4650
4651
4652
4653 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4654 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4655 final RegionServerServices rsServices, final CancelableProgressable reporter)
4656 throws IOException {
4657 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4658 return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4659 }
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4678 final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4679 final RegionServerServices rsServices, final CancelableProgressable reporter)
4680 throws IOException {
4681 if (info == null) throw new NullPointerException("Passed region info is null");
4682 if (LOG.isDebugEnabled()) {
4683 LOG.debug("Opening region: " + info);
4684 }
4685 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4686 return r.openHRegion(reporter);
4687 }
4688
4689
4690
4691
4692
4693
4694
4695
4696
4697 public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4698 throws IOException {
4699 HRegionFileSystem regionFs = other.getRegionFileSystem();
4700 HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4701 other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4702 return r.openHRegion(reporter);
4703 }
4704
4705
4706
4707
4708
4709
4710
4711
4712 protected HRegion openHRegion(final CancelableProgressable reporter)
4713 throws IOException {
4714
4715 checkCompressionCodecs();
4716
4717
4718 checkEncryption();
4719
4720 checkClassLoading();
4721 this.openSeqNum = initialize(reporter);
4722 this.setSequenceId(openSeqNum);
4723 return this;
4724 }
4725
4726 private void checkCompressionCodecs() throws IOException {
4727 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4728 CompressionTest.testCompression(fam.getCompression());
4729 CompressionTest.testCompression(fam.getCompactionCompression());
4730 }
4731 }
4732
4733 private void checkEncryption() throws IOException {
4734 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4735 EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
4736 }
4737 }
4738
4739 private void checkClassLoading() throws IOException {
4740 RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
4741 RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
4742 }
4743
4744
4745
4746
4747
4748
4749 HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4750
4751 fs.commitDaughterRegion(hri);
4752
4753
4754 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4755 this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4756 r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4757 r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4758 return r;
4759 }
4760
4761
4762
4763
4764
4765
4766
4767
4768 HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4769 final HRegion region_b) throws IOException {
4770 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4771 fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4772 this.getTableDesc(), this.rsServices);
4773 r.readRequestsCount.set(this.getReadRequestsCount()
4774 + region_b.getReadRequestsCount());
4775 r.writeRequestsCount.set(this.getWriteRequestsCount()
4776
4777 + region_b.getWriteRequestsCount());
4778 this.fs.commitMergedRegion(mergedRegionInfo);
4779 return r;
4780 }
4781
4782
4783
4784
4785
4786
4787
4788
4789
4790
4791
4792
4793 public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4794 meta.checkResources();
4795
4796 byte[] row = r.getRegionName();
4797 final long now = EnvironmentEdgeManager.currentTimeMillis();
4798 final List<Cell> cells = new ArrayList<Cell>(2);
4799 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4800 HConstants.REGIONINFO_QUALIFIER, now,
4801 r.getRegionInfo().toByteArray()));
4802
4803 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4804 HConstants.META_VERSION_QUALIFIER, now,
4805 Bytes.toBytes(HConstants.META_VERSION)));
4806 meta.put(row, HConstants.CATALOG_FAMILY, cells);
4807 }
4808
4809
4810
4811
4812
4813
4814
4815
4816 @Deprecated
4817 public static Path getRegionDir(final Path tabledir, final String name) {
4818 return new Path(tabledir, name);
4819 }
4820
4821
4822
4823
4824
4825
4826
4827
4828 @Deprecated
4829 public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4830 return new Path(
4831 FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4832 }
4833
4834
4835
4836
4837
4838
4839
4840
4841
4842 public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4843 return ((info.getStartKey().length == 0) ||
4844 (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4845 ((info.getEndKey().length == 0) ||
4846 (Bytes.compareTo(info.getEndKey(), row) > 0));
4847 }
4848
4849
4850
4851
4852
4853
4854
4855
4856
4857 public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4858 throws IOException {
4859 HRegion a = srcA;
4860 HRegion b = srcB;
4861
4862
4863
4864 if (srcA.getStartKey() == null) {
4865 if (srcB.getStartKey() == null) {
4866 throw new IOException("Cannot merge two regions with null start key");
4867 }
4868
4869 } else if ((srcB.getStartKey() == null) ||
4870 (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4871 a = srcB;
4872 b = srcA;
4873 }
4874
4875 if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4876 throw new IOException("Cannot merge non-adjacent regions");
4877 }
4878 return merge(a, b);
4879 }
4880
4881
4882
4883
4884
4885
4886
4887
4888
4889 public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4890 if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4891 throw new IOException("Regions do not belong to the same table");
4892 }
4893
4894 FileSystem fs = a.getRegionFileSystem().getFileSystem();
4895
4896 a.flushcache();
4897 b.flushcache();
4898
4899
4900 a.compactStores(true);
4901 if (LOG.isDebugEnabled()) {
4902 LOG.debug("Files for region: " + a);
4903 a.getRegionFileSystem().logFileSystemState(LOG);
4904 }
4905 b.compactStores(true);
4906 if (LOG.isDebugEnabled()) {
4907 LOG.debug("Files for region: " + b);
4908 b.getRegionFileSystem().logFileSystemState(LOG);
4909 }
4910
4911 RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4912 if (!rmt.prepare(null)) {
4913 throw new IOException("Unable to merge regions " + a + " and " + b);
4914 }
4915 HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4916 LOG.info("starting merge of regions: " + a + " and " + b
4917 + " into new region " + mergedRegionInfo.getRegionNameAsString()
4918 + " with start key <"
4919 + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4920 + "> and end key <"
4921 + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4922 HRegion dstRegion;
4923 try {
4924 dstRegion = rmt.execute(null, null);
4925 } catch (IOException ioe) {
4926 rmt.rollback(null, null);
4927 throw new IOException("Failed merging region " + a + " and " + b
4928 + ", and succssfully rolled back");
4929 }
4930 dstRegion.compactStores(true);
4931
4932 if (LOG.isDebugEnabled()) {
4933 LOG.debug("Files for new region");
4934 dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4935 }
4936
4937 if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4938 throw new IOException("Merged region " + dstRegion
4939 + " still has references after the compaction, is compaction canceled?");
4940 }
4941
4942
4943 HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4944
4945 HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4946
4947 LOG.info("merge completed. New region is " + dstRegion);
4948 return dstRegion;
4949 }
4950
4951
4952
4953
4954
4955 boolean isMajorCompaction() throws IOException {
4956 for (Store store : this.stores.values()) {
4957 if (store.isMajorCompaction()) {
4958 return true;
4959 }
4960 }
4961 return false;
4962 }
4963
4964
4965
4966
4967
4968
4969
4970
4971
4972 public Result get(final Get get) throws IOException {
4973 checkRow(get.getRow(), "Get");
4974
4975 if (get.hasFamilies()) {
4976 for (byte [] family: get.familySet()) {
4977 checkFamily(family);
4978 }
4979 } else {
4980 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4981 get.addFamily(family);
4982 }
4983 }
4984 List<Cell> results = get(get, true);
4985 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null);
4986 }
4987
4988
4989
4990
4991
4992
4993 public List<Cell> get(Get get, boolean withCoprocessor)
4994 throws IOException {
4995
4996 List<Cell> results = new ArrayList<Cell>();
4997
4998
4999 if (withCoprocessor && (coprocessorHost != null)) {
5000 if (coprocessorHost.preGet(get, results)) {
5001 return results;
5002 }
5003 }
5004
5005 Scan scan = new Scan(get);
5006
5007 RegionScanner scanner = null;
5008 try {
5009 scanner = getScanner(scan);
5010 scanner.next(results);
5011 } finally {
5012 if (scanner != null)
5013 scanner.close();
5014 }
5015
5016
5017 if (withCoprocessor && (coprocessorHost != null)) {
5018 coprocessorHost.postGet(get, results);
5019 }
5020
5021
5022 if (this.metricsRegion != null) {
5023 long totalSize = 0l;
5024 if (results != null) {
5025 for (Cell kv:results) {
5026 totalSize += KeyValueUtil.ensureKeyValue(kv).getLength();
5027 }
5028 }
5029 this.metricsRegion.updateGet(totalSize);
5030 }
5031
5032 return results;
5033 }
5034
5035 public void mutateRow(RowMutations rm) throws IOException {
5036
5037 mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
5038 }
5039
5040
5041
5042
5043
5044 public void mutateRowsWithLocks(Collection<Mutation> mutations,
5045 Collection<byte[]> rowsToLock) throws IOException {
5046 mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
5047 }
5048
5049
5050
5051
5052
5053
5054
5055
5056
5057
5058
5059
5060
5061 public void mutateRowsWithLocks(Collection<Mutation> mutations,
5062 Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
5063 MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
5064 processRowsWithLocks(proc, -1, nonceGroup, nonce);
5065 }
5066
5067
5068
5069
5070 public ClientProtos.RegionLoadStats getRegionStats() {
5071 if (!regionStatsEnabled) {
5072 return null;
5073 }
5074 ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
5075 stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
5076 .memstoreFlushSize)));
5077 stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
5078 return stats.build();
5079 }
5080
5081
5082
5083
5084
5085
5086
5087
5088 public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
5089 throws IOException {
5090 processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
5091 }
5092
5093
5094
5095
5096
5097
5098
5099
5100
5101
5102 public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
5103 long nonceGroup, long nonce) throws IOException {
5104
5105 for (byte[] row : processor.getRowsToLock()) {
5106 checkRow(row, "processRowsWithLocks");
5107 }
5108 if (!processor.readOnly()) {
5109 checkReadOnly();
5110 }
5111 checkResources();
5112
5113 startRegionOperation();
5114 WALEdit walEdit = new WALEdit();
5115
5116
5117 try {
5118 processor.preProcess(this, walEdit);
5119 } catch (IOException e) {
5120 closeRegionOperation();
5121 throw e;
5122 }
5123
5124 if (processor.readOnly()) {
5125 try {
5126 long now = EnvironmentEdgeManager.currentTimeMillis();
5127 doProcessRowWithTimeout(
5128 processor, now, this, null, null, timeout);
5129 processor.postProcess(this, walEdit, true);
5130 } catch (IOException e) {
5131 throw e;
5132 } finally {
5133 closeRegionOperation();
5134 }
5135 return;
5136 }
5137
5138 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
5139 boolean locked = false;
5140 boolean walSyncSuccessful = false;
5141 List<RowLock> acquiredRowLocks = null;
5142 long addedSize = 0;
5143 List<Mutation> mutations = new ArrayList<Mutation>();
5144 Collection<byte[]> rowsToLock = processor.getRowsToLock();
5145 try {
5146
5147 acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
5148 for (byte[] row : rowsToLock) {
5149
5150 acquiredRowLocks.add(getRowLock(row));
5151 }
5152
5153 lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
5154 locked = true;
5155
5156 long now = EnvironmentEdgeManager.currentTimeMillis();
5157 try {
5158
5159
5160 doProcessRowWithTimeout(
5161 processor, now, this, mutations, walEdit, timeout);
5162
5163 if (!mutations.isEmpty()) {
5164
5165 writeEntry = mvcc.beginMemstoreInsert();
5166
5167 processor.preBatchMutate(this, walEdit);
5168
5169 for (Mutation m : mutations) {
5170
5171 rewriteCellTags(m.getFamilyCellMap(), m);
5172
5173 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5174 KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
5175 kv.setMvccVersion(writeEntry.getWriteNumber());
5176 byte[] family = kv.getFamily();
5177 checkFamily(family);
5178 addedSize += stores.get(family).add(kv);
5179 }
5180 }
5181
5182 long txid = 0;
5183
5184 if (!walEdit.isEmpty()) {
5185 txid = this.log.appendNoSync(this.getRegionInfo(),
5186 this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now,
5187 this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce);
5188 }
5189
5190 if (locked) {
5191 this.updatesLock.readLock().unlock();
5192 locked = false;
5193 }
5194
5195
5196 releaseRowLocks(acquiredRowLocks);
5197
5198
5199 if (txid != 0) {
5200 syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5201 }
5202 walSyncSuccessful = true;
5203
5204 processor.postBatchMutate(this);
5205 }
5206 } finally {
5207 if (!mutations.isEmpty() && !walSyncSuccessful) {
5208 LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5209 " memstore keyvalues for row(s):" +
5210 processor.getRowsToLock().iterator().next() + "...");
5211 for (Mutation m : mutations) {
5212 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5213 KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
5214 stores.get(kv.getFamily()).rollback(kv);
5215 }
5216 }
5217 }
5218
5219 if (writeEntry != null) {
5220 mvcc.completeMemstoreInsert(writeEntry);
5221 writeEntry = null;
5222 }
5223 if (locked) {
5224 this.updatesLock.readLock().unlock();
5225 locked = false;
5226 }
5227
5228 releaseRowLocks(acquiredRowLocks);
5229 }
5230
5231
5232 processor.postProcess(this, walEdit, walSyncSuccessful);
5233
5234 } catch (IOException e) {
5235 throw e;
5236 } finally {
5237 closeRegionOperation();
5238 if (!mutations.isEmpty() &&
5239 isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5240 requestFlush();
5241 }
5242 }
5243 }
5244
5245 private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5246 final long now,
5247 final HRegion region,
5248 final List<Mutation> mutations,
5249 final WALEdit walEdit,
5250 final long timeout) throws IOException {
5251
5252 if (timeout < 0) {
5253 try {
5254 processor.process(now, region, mutations, walEdit);
5255 } catch (IOException e) {
5256 LOG.warn("RowProcessor:" + processor.getClass().getName() +
5257 " throws Exception on row(s):" +
5258 Bytes.toStringBinary(
5259 processor.getRowsToLock().iterator().next()) + "...", e);
5260 throw e;
5261 }
5262 return;
5263 }
5264
5265
5266 FutureTask<Void> task =
5267 new FutureTask<Void>(new Callable<Void>() {
5268 @Override
5269 public Void call() throws IOException {
5270 try {
5271 processor.process(now, region, mutations, walEdit);
5272 return null;
5273 } catch (IOException e) {
5274 LOG.warn("RowProcessor:" + processor.getClass().getName() +
5275 " throws Exception on row(s):" +
5276 Bytes.toStringBinary(
5277 processor.getRowsToLock().iterator().next()) + "...", e);
5278 throw e;
5279 }
5280 }
5281 });
5282 rowProcessorExecutor.execute(task);
5283 try {
5284 task.get(timeout, TimeUnit.MILLISECONDS);
5285 } catch (TimeoutException te) {
5286 LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5287 Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5288 "...");
5289 throw new IOException(te);
5290 } catch (Exception e) {
5291 throw new IOException(e);
5292 }
5293 }
5294
5295 public Result append(Append append) throws IOException {
5296 return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5297 }
5298
5299
5300
5301
5302
5303
5304
5305
5306
5307
5308
5309 public Result append(Append append, long nonceGroup, long nonce)
5310 throws IOException {
5311 byte[] row = append.getRow();
5312 checkRow(row, "append");
5313 boolean flush = false;
5314 Durability durability = getEffectiveDurability(append.getDurability());
5315 boolean writeToWAL = durability != Durability.SKIP_WAL;
5316 WALEdit walEdits = null;
5317 List<Cell> allKVs = new ArrayList<Cell>(append.size());
5318 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5319
5320 long size = 0;
5321 long txid = 0;
5322
5323 checkReadOnly();
5324 checkResources();
5325
5326 startRegionOperation(Operation.APPEND);
5327 this.writeRequestsCount.increment();
5328 WriteEntry w = null;
5329 RowLock rowLock;
5330 try {
5331 rowLock = getRowLock(row);
5332 try {
5333 lock(this.updatesLock.readLock());
5334 try {
5335
5336
5337 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
5338 if (this.coprocessorHost != null) {
5339 Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5340 if(r!= null) {
5341 return r;
5342 }
5343 }
5344
5345 w = mvcc.beginMemstoreInsert();
5346 long now = EnvironmentEdgeManager.currentTimeMillis();
5347
5348 for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5349
5350 Store store = stores.get(family.getKey());
5351 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5352
5353
5354
5355
5356
5357 Collections.sort(family.getValue(), store.getComparator());
5358
5359 Get get = new Get(row);
5360 for (Cell cell : family.getValue()) {
5361 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5362 get.addColumn(family.getKey(), kv.getQualifier());
5363 }
5364 List<Cell> results = get(get, false);
5365
5366
5367
5368
5369
5370
5371
5372 int idx = 0;
5373 for (Cell cell : family.getValue()) {
5374 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5375 KeyValue newKv;
5376 KeyValue oldKv = null;
5377 if (idx < results.size()
5378 && CellUtil.matchingQualifier(results.get(idx), kv)) {
5379 oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
5380 long ts = Math.max(now, oldKv.getTimestamp());
5381
5382
5383 List<Tag> newTags = new ArrayList<Tag>();
5384
5385
5386
5387 if (oldKv.getTagsLengthUnsigned() > 0) {
5388 Iterator<Tag> i = CellUtil.tagsIterator(oldKv.getTagsArray(),
5389 oldKv.getTagsOffset(), oldKv.getTagsLengthUnsigned());
5390 while (i.hasNext()) {
5391 newTags.add(i.next());
5392 }
5393 }
5394 if (kv.getTagsLengthUnsigned() > 0) {
5395 Iterator<Tag> i = CellUtil.tagsIterator(kv.getTagsArray(), kv.getTagsOffset(),
5396 kv.getTagsLengthUnsigned());
5397 while (i.hasNext()) {
5398 newTags.add(i.next());
5399 }
5400 }
5401
5402
5403
5404 if (append.getTTL() != Long.MAX_VALUE) {
5405
5406 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
5407 }
5408
5409
5410 byte[] tagBytes = Tag.fromList(newTags);
5411
5412
5413 newKv = new KeyValue(row.length, kv.getFamilyLength(),
5414 kv.getQualifierLength(), ts, KeyValue.Type.Put,
5415 oldKv.getValueLength() + kv.getValueLength(),
5416 tagBytes.length);
5417
5418 System.arraycopy(kv.getRowArray(), kv.getRowOffset(),
5419 newKv.getRowArray(), newKv.getRowOffset(), kv.getRowLength());
5420 System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(),
5421 newKv.getFamilyArray(), newKv.getFamilyOffset(),
5422 kv.getFamilyLength());
5423 System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(),
5424 newKv.getQualifierArray(), newKv.getQualifierOffset(),
5425 kv.getQualifierLength());
5426
5427 System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(),
5428 newKv.getValueArray(), newKv.getValueOffset(),
5429 oldKv.getValueLength());
5430 System.arraycopy(kv.getValueArray(), kv.getValueOffset(),
5431 newKv.getValueArray(),
5432 newKv.getValueOffset() + oldKv.getValueLength(),
5433 kv.getValueLength());
5434
5435 System.arraycopy(tagBytes, 0, newKv.getTagsArray(), newKv.getTagsOffset(),
5436 tagBytes.length);
5437 idx++;
5438 } else {
5439
5440
5441 kv.updateLatestStamp(Bytes.toBytes(now));
5442
5443
5444
5445 if (append.getTTL() != Long.MAX_VALUE) {
5446 List<Tag> newTags = new ArrayList<Tag>(1);
5447 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
5448
5449 newKv = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
5450 kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
5451 kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
5452 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
5453 kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
5454 newTags);
5455 } else {
5456 newKv = kv;
5457 }
5458 }
5459 newKv.setMvccVersion(w.getWriteNumber());
5460
5461
5462 if (coprocessorHost != null) {
5463 newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5464 RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKv));
5465 }
5466 kvs.add(newKv);
5467
5468
5469 if (writeToWAL) {
5470 if (walEdits == null) {
5471 walEdits = new WALEdit();
5472 }
5473 walEdits.add(newKv);
5474 }
5475 }
5476
5477
5478 tempMemstore.put(store, kvs);
5479 }
5480
5481
5482 if (writeToWAL) {
5483
5484
5485
5486 txid = this.log.appendNoSync(this.getRegionInfo(),
5487 this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
5488 EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
5489 true, nonceGroup, nonce);
5490 } else {
5491 recordMutationWithoutWal(append.getFamilyCellMap());
5492 }
5493
5494
5495 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5496 Store store = entry.getKey();
5497 if (store.getFamily().getMaxVersions() == 1) {
5498
5499 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5500 } else {
5501
5502 for (Cell cell: entry.getValue()) {
5503 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5504 size += store.add(kv);
5505 }
5506 }
5507 allKVs.addAll(entry.getValue());
5508 }
5509 size = this.addAndGetGlobalMemstoreSize(size);
5510 flush = isFlushSize(size);
5511 } finally {
5512 this.updatesLock.readLock().unlock();
5513 }
5514 } finally {
5515 rowLock.release();
5516 }
5517 if (writeToWAL) {
5518
5519 syncOrDefer(txid, durability);
5520 }
5521 } finally {
5522 if (w != null) {
5523 mvcc.completeMemstoreInsert(w);
5524 }
5525 closeRegionOperation(Operation.APPEND);
5526 }
5527
5528 if (this.metricsRegion != null) {
5529 this.metricsRegion.updateAppend();
5530 }
5531
5532 if (flush) {
5533
5534 requestFlush();
5535 }
5536
5537
5538 return append.isReturnResults() ? Result.create(allKVs) : null;
5539 }
5540
5541 public Result increment(Increment increment) throws IOException {
5542 return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5543 }
5544
5545
5546
5547
5548
5549
5550
5551
5552
5553
5554 public Result increment(Increment increment, long nonceGroup, long nonce)
5555 throws IOException {
5556 byte [] row = increment.getRow();
5557 checkRow(row, "increment");
5558 TimeRange tr = increment.getTimeRange();
5559 boolean flush = false;
5560 Durability durability = getEffectiveDurability(increment.getDurability());
5561 boolean writeToWAL = durability != Durability.SKIP_WAL;
5562 WALEdit walEdits = null;
5563 List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5564 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5565
5566 long size = 0;
5567 long txid = 0;
5568
5569 checkReadOnly();
5570 checkResources();
5571
5572 startRegionOperation(Operation.INCREMENT);
5573 this.writeRequestsCount.increment();
5574 WriteEntry w = null;
5575 try {
5576 RowLock rowLock = getRowLock(row);
5577 try {
5578 lock(this.updatesLock.readLock());
5579 try {
5580
5581
5582 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
5583 if (this.coprocessorHost != null) {
5584 Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5585 if (r != null) {
5586 return r;
5587 }
5588 }
5589
5590 w = mvcc.beginMemstoreInsert();
5591 long now = EnvironmentEdgeManager.currentTimeMillis();
5592
5593 for (Map.Entry<byte [], List<Cell>> family:
5594 increment.getFamilyCellMap().entrySet()) {
5595
5596 Store store = stores.get(family.getKey());
5597 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5598
5599
5600
5601
5602
5603 Collections.sort(family.getValue(), store.getComparator());
5604
5605 Get get = new Get(row);
5606 for (Cell cell: family.getValue()) {
5607 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5608 get.addColumn(family.getKey(), kv.getQualifier());
5609 }
5610 get.setTimeRange(tr.getMin(), tr.getMax());
5611 List<Cell> results = get(get, false);
5612
5613
5614
5615 int idx = 0;
5616 List<Cell> edits = family.getValue();
5617 for (int i = 0; i < edits.size(); i++) {
5618 Cell cell = edits.get(i);
5619 long amount = Bytes.toLong(CellUtil.cloneValue(cell));
5620 boolean noWriteBack = (amount == 0);
5621 List<Tag> newTags = new ArrayList<Tag>();
5622
5623
5624 if (cell.getTagsLengthUnsigned() > 0) {
5625 Iterator<Tag> itr = CellUtil.tagsIterator(cell.getTagsArray(),
5626 cell.getTagsOffset(), cell.getTagsLengthUnsigned());
5627 while (itr.hasNext()) {
5628 newTags.add(itr.next());
5629 }
5630 }
5631
5632 Cell c = null;
5633 long ts = now;
5634 if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
5635 c = results.get(idx);
5636 ts = Math.max(now, c.getTimestamp());
5637 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5638 amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5639 } else {
5640
5641 throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5642 "Attempted to increment field that isn't 64 bits wide");
5643 }
5644
5645 if (c.getTagsLength() > 0) {
5646 Iterator<Tag> itr = CellUtil.tagsIterator(c.getTagsArray(),
5647 c.getTagsOffset(), c.getTagsLength());
5648 while (itr.hasNext()) {
5649 newTags.add(itr.next());
5650 }
5651 }
5652 if (i < ( edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1)))
5653 idx++;
5654 }
5655
5656
5657 byte[] q = CellUtil.cloneQualifier(cell);
5658 byte[] val = Bytes.toBytes(amount);
5659
5660
5661 if (increment.getTTL() != Long.MAX_VALUE) {
5662 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
5663 }
5664
5665 KeyValue newKv = new KeyValue(row, 0, row.length,
5666 family.getKey(), 0, family.getKey().length,
5667 q, 0, q.length,
5668 ts,
5669 KeyValue.Type.Put,
5670 val, 0, val.length,
5671 newTags);
5672
5673 newKv.setMvccVersion(w.getWriteNumber());
5674
5675
5676 if (coprocessorHost != null) {
5677 newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5678 RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKv));
5679 }
5680 allKVs.add(newKv);
5681
5682 if (!noWriteBack) {
5683 kvs.add(newKv);
5684
5685
5686 if (writeToWAL) {
5687 if (walEdits == null) {
5688 walEdits = new WALEdit();
5689 }
5690 walEdits.add(newKv);
5691 }
5692 }
5693 }
5694
5695
5696 if (!kvs.isEmpty()) {
5697 tempMemstore.put(store, kvs);
5698 }
5699 }
5700
5701
5702 if (walEdits != null && !walEdits.isEmpty()) {
5703 if (writeToWAL) {
5704
5705
5706
5707 txid = this.log.appendNoSync(this.getRegionInfo(),
5708 this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
5709 EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
5710 true, nonceGroup, nonce);
5711 } else {
5712 recordMutationWithoutWal(increment.getFamilyCellMap());
5713 }
5714 }
5715
5716 if (!tempMemstore.isEmpty()) {
5717 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5718 Store store = entry.getKey();
5719 if (store.getFamily().getMaxVersions() == 1) {
5720
5721 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5722 } else {
5723
5724 for (Cell cell : entry.getValue()) {
5725 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5726 size += store.add(kv);
5727 }
5728 }
5729 }
5730 size = this.addAndGetGlobalMemstoreSize(size);
5731 flush = isFlushSize(size);
5732 }
5733 } finally {
5734 this.updatesLock.readLock().unlock();
5735 }
5736 } finally {
5737 rowLock.release();
5738 }
5739 if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {
5740
5741 syncOrDefer(txid, durability);
5742 }
5743 } finally {
5744 if (w != null) {
5745 mvcc.completeMemstoreInsert(w);
5746 }
5747 closeRegionOperation(Operation.INCREMENT);
5748 if (this.metricsRegion != null) {
5749 this.metricsRegion.updateIncrement();
5750 }
5751 }
5752
5753 if (flush) {
5754
5755 requestFlush();
5756 }
5757
5758 return Result.create(allKVs);
5759 }
5760
5761
5762
5763
5764
5765 private void checkFamily(final byte [] family)
5766 throws NoSuchColumnFamilyException {
5767 if (!this.htableDescriptor.hasFamily(family)) {
5768 throw new NoSuchColumnFamilyException("Column family " +
5769 Bytes.toString(family) + " does not exist in region " + this
5770 + " in table " + this.htableDescriptor);
5771 }
5772 }
5773
5774 public static final long FIXED_OVERHEAD = ClassSize.align(
5775 ClassSize.OBJECT +
5776 ClassSize.ARRAY +
5777 42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5778 (12 * Bytes.SIZEOF_LONG) +
5779 5 * Bytes.SIZEOF_BOOLEAN);
5780
5781
5782
5783
5784
5785
5786
5787
5788
5789
5790
5791 public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5792 ClassSize.OBJECT +
5793 (2 * ClassSize.ATOMIC_BOOLEAN) +
5794 (3 * ClassSize.ATOMIC_LONG) +
5795 (2 * ClassSize.CONCURRENT_HASHMAP) +
5796 WriteState.HEAP_SIZE +
5797 ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
5798 (2 * ClassSize.REENTRANT_LOCK) +
5799 ClassSize.ARRAYLIST +
5800 MultiVersionConsistencyControl.FIXED_SIZE
5801 + ClassSize.TREEMAP
5802 + 2 * ClassSize.ATOMIC_INTEGER
5803 ;
5804
5805 @Override
5806 public long heapSize() {
5807 long heapSize = DEEP_OVERHEAD;
5808 for (Store store : this.stores.values()) {
5809 heapSize += store.heapSize();
5810 }
5811
5812 return heapSize;
5813 }
5814
5815
5816
5817
5818
5819 private static void printUsageAndExit(final String message) {
5820 if (message != null && message.length() > 0) System.out.println(message);
5821 System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
5822 System.out.println("Options:");
5823 System.out.println(" major_compact Pass this option to major compact " +
5824 "passed region.");
5825 System.out.println("Default outputs scan of passed region.");
5826 System.exit(1);
5827 }
5828
5829
5830
5831
5832
5833
5834
5835
5836
5837
5838
5839
5840
5841
5842
5843
5844
5845 public boolean registerService(Service instance) {
5846
5847
5848
5849 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5850 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5851 LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5852 " already registered, rejecting request from "+instance
5853 );
5854 return false;
5855 }
5856
5857 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5858 if (LOG.isDebugEnabled()) {
5859 LOG.debug("Registered coprocessor service: region="+
5860 Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5861 }
5862 return true;
5863 }
5864
5865
5866
5867
5868
5869
5870
5871
5872
5873
5874
5875
5876
5877
5878
5879 public Message execService(RpcController controller, CoprocessorServiceCall call)
5880 throws IOException {
5881 String serviceName = call.getServiceName();
5882 String methodName = call.getMethodName();
5883 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5884 throw new UnknownProtocolException(null,
5885 "No registered coprocessor service found for name "+serviceName+
5886 " in region "+Bytes.toStringBinary(getRegionName()));
5887 }
5888
5889 Service service = coprocessorServiceHandlers.get(serviceName);
5890 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5891 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5892 if (methodDesc == null) {
5893 throw new UnknownProtocolException(service.getClass(),
5894 "Unknown method "+methodName+" called on service "+serviceName+
5895 " in region "+Bytes.toStringBinary(getRegionName()));
5896 }
5897
5898 Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5899 .mergeFrom(call.getRequest()).build();
5900
5901 if (coprocessorHost != null) {
5902 request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5903 }
5904
5905 final Message.Builder responseBuilder =
5906 service.getResponsePrototype(methodDesc).newBuilderForType();
5907 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5908 @Override
5909 public void run(Message message) {
5910 if (message != null) {
5911 responseBuilder.mergeFrom(message);
5912 }
5913 }
5914 });
5915
5916 if (coprocessorHost != null) {
5917 coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5918 }
5919
5920 return responseBuilder.build();
5921 }
5922
5923
5924
5925
5926
5927
5928
5929
5930
5931
5932
5933 private static void processTable(final FileSystem fs, final Path p,
5934 final HLog log, final Configuration c,
5935 final boolean majorCompact)
5936 throws IOException {
5937 HRegion region = null;
5938 FSTableDescriptors fst = new FSTableDescriptors(c);
5939
5940 if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5941 region = HRegion.newHRegion(p, log, fs, c,
5942 HRegionInfo.FIRST_META_REGIONINFO, fst.get(TableName.META_TABLE_NAME), null);
5943 } else {
5944 throw new IOException("Not a known catalog table: " + p.toString());
5945 }
5946 try {
5947 region.initialize();
5948 if (majorCompact) {
5949 region.compactStores(true);
5950 } else {
5951
5952 Scan scan = new Scan();
5953
5954 RegionScanner scanner = region.getScanner(scan);
5955 try {
5956 List<Cell> kvs = new ArrayList<Cell>();
5957 boolean done;
5958 do {
5959 kvs.clear();
5960 done = scanner.next(kvs);
5961 if (kvs.size() > 0) LOG.info(kvs);
5962 } while (done);
5963 } finally {
5964 scanner.close();
5965 }
5966 }
5967 } finally {
5968 region.close();
5969 }
5970 }
5971
5972 boolean shouldForceSplit() {
5973 return this.splitRequest;
5974 }
5975
5976 byte[] getExplicitSplitPoint() {
5977 return this.explicitSplitPoint;
5978 }
5979
5980 void forceSplit(byte[] sp) {
5981
5982
5983 this.splitRequest = true;
5984 if (sp != null) {
5985 this.explicitSplitPoint = sp;
5986 }
5987 }
5988
5989 void clearSplit() {
5990 this.splitRequest = false;
5991 this.explicitSplitPoint = null;
5992 }
5993
5994
5995
5996
5997 protected void prepareToSplit() {
5998
5999 }
6000
6001
6002
6003
6004
6005
6006
6007 public byte[] checkSplit() {
6008
6009 if (this.getRegionInfo().isMetaTable() ||
6010 TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
6011 if (shouldForceSplit()) {
6012 LOG.warn("Cannot split meta region in HBase 0.20 and above");
6013 }
6014 return null;
6015 }
6016
6017
6018 if (this.isRecovering()) {
6019 LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
6020 return null;
6021 }
6022
6023 if (!splitPolicy.shouldSplit()) {
6024 return null;
6025 }
6026
6027 byte[] ret = splitPolicy.getSplitPoint();
6028
6029 if (ret != null) {
6030 try {
6031 checkRow(ret, "calculated split");
6032 } catch (IOException e) {
6033 LOG.error("Ignoring invalid split", e);
6034 return null;
6035 }
6036 }
6037 return ret;
6038 }
6039
6040
6041
6042
6043 public int getCompactPriority() {
6044 int count = Integer.MAX_VALUE;
6045 for (Store store : stores.values()) {
6046 count = Math.min(count, store.getCompactPriority());
6047 }
6048 return count;
6049 }
6050
6051
6052
6053
6054
6055
6056 public boolean needsCompaction() {
6057 for (Store store : stores.values()) {
6058 if(store.needsCompaction()) {
6059 return true;
6060 }
6061 }
6062 return false;
6063 }
6064
6065
6066 public RegionCoprocessorHost getCoprocessorHost() {
6067 return coprocessorHost;
6068 }
6069
6070
6071 public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
6072 this.coprocessorHost = coprocessorHost;
6073 }
6074
6075
6076
6077
6078
6079
6080
6081
6082 public void startRegionOperation() throws IOException {
6083 startRegionOperation(Operation.ANY);
6084 }
6085
6086
6087
6088
6089
6090 protected void startRegionOperation(Operation op) throws IOException {
6091 switch (op) {
6092 case INCREMENT:
6093 case APPEND:
6094 case GET:
6095 case SCAN:
6096 case SPLIT_REGION:
6097 case MERGE_REGION:
6098 case PUT:
6099 case DELETE:
6100 case BATCH_MUTATE:
6101 case COMPACT_REGION:
6102
6103 if (isRecovering() && (this.disallowWritesInRecovering ||
6104 (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
6105 throw new RegionInRecoveryException(this.getRegionNameAsString() +
6106 " is recovering; cannot take reads");
6107 }
6108 break;
6109 default:
6110 break;
6111 }
6112 if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
6113 || op == Operation.COMPACT_REGION) {
6114
6115
6116 return;
6117 }
6118 if (this.closing.get()) {
6119 throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6120 }
6121 lock(lock.readLock());
6122 if (this.closed.get()) {
6123 lock.readLock().unlock();
6124 throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6125 }
6126 try {
6127 if (coprocessorHost != null) {
6128 coprocessorHost.postStartRegionOperation(op);
6129 }
6130 } catch (Exception e) {
6131 lock.readLock().unlock();
6132 throw new IOException(e);
6133 }
6134 }
6135
6136
6137
6138
6139
6140
6141 public void closeRegionOperation() throws IOException {
6142 closeRegionOperation(Operation.ANY);
6143 }
6144
6145
6146
6147
6148
6149
6150
6151 public void closeRegionOperation(Operation operation) throws IOException {
6152 lock.readLock().unlock();
6153 if (coprocessorHost != null) {
6154 coprocessorHost.postCloseRegionOperation(operation);
6155 }
6156 }
6157
6158
6159
6160
6161
6162
6163
6164
6165
6166
6167 private void startBulkRegionOperation(boolean writeLockNeeded)
6168 throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
6169 if (this.closing.get()) {
6170 throw new NotServingRegionException(getRegionNameAsString() + " is closing");
6171 }
6172 if (writeLockNeeded) lock(lock.writeLock());
6173 else lock(lock.readLock());
6174 if (this.closed.get()) {
6175 if (writeLockNeeded) lock.writeLock().unlock();
6176 else lock.readLock().unlock();
6177 throw new NotServingRegionException(getRegionNameAsString() + " is closed");
6178 }
6179 }
6180
6181
6182
6183
6184
6185 private void closeBulkRegionOperation(){
6186 if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
6187 else lock.readLock().unlock();
6188 }
6189
6190
6191
6192
6193
6194 private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
6195 numMutationsWithoutWAL.increment();
6196 if (numMutationsWithoutWAL.get() <= 1) {
6197 LOG.info("writing data to region " + this +
6198 " with WAL disabled. Data may be lost in the event of a crash.");
6199 }
6200
6201 long mutationSize = 0;
6202 for (List<Cell> cells: familyMap.values()) {
6203 assert cells instanceof RandomAccess;
6204 int listSize = cells.size();
6205 for (int i=0; i < listSize; i++) {
6206 Cell cell = cells.get(i);
6207
6208 mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
6209 }
6210 }
6211
6212 dataInMemoryWithoutWAL.add(mutationSize);
6213 }
6214
6215 private void lock(final Lock lock)
6216 throws RegionTooBusyException, InterruptedIOException {
6217 lock(lock, 1);
6218 }
6219
6220
6221
6222
6223
6224
6225 private void lock(final Lock lock, final int multiplier)
6226 throws RegionTooBusyException, InterruptedIOException {
6227 try {
6228 final long waitTime = Math.min(maxBusyWaitDuration,
6229 busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
6230 if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
6231 throw new RegionTooBusyException(
6232 "failed to get a lock in " + waitTime + " ms. " +
6233 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
6234 this.getRegionInfo().getRegionNameAsString()) +
6235 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
6236 this.getRegionServerServices().getServerName()));
6237 }
6238 } catch (InterruptedException ie) {
6239 LOG.info("Interrupted while waiting for a lock");
6240 InterruptedIOException iie = new InterruptedIOException();
6241 iie.initCause(ie);
6242 throw iie;
6243 }
6244 }
6245
6246
6247
6248
6249
6250
6251
6252 private void syncOrDefer(long txid, Durability durability) throws IOException {
6253 if (this.getRegionInfo().isMetaRegion()) {
6254 this.log.sync(txid);
6255 } else {
6256 switch(durability) {
6257 case USE_DEFAULT:
6258
6259 if (shouldSyncLog()) {
6260 this.log.sync(txid);
6261 }
6262 break;
6263 case SKIP_WAL:
6264
6265 break;
6266 case ASYNC_WAL:
6267
6268 break;
6269 case SYNC_WAL:
6270 case FSYNC_WAL:
6271
6272 this.log.sync(txid);
6273 break;
6274 }
6275 }
6276 }
6277
6278
6279
6280
6281 private boolean shouldSyncLog() {
6282 return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
6283 }
6284
6285
6286
6287
6288 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6289
6290 @Override
6291 public void add(int index, Cell element) {
6292
6293 }
6294
6295 @Override
6296 public boolean addAll(int index, Collection<? extends Cell> c) {
6297 return false;
6298 }
6299
6300 @Override
6301 public KeyValue get(int index) {
6302 throw new UnsupportedOperationException();
6303 }
6304
6305 @Override
6306 public int size() {
6307 return 0;
6308 }
6309 };
6310
6311
6312
6313
6314
6315
6316
6317
6318
6319
6320
6321 public static void main(String[] args) throws IOException {
6322 if (args.length < 1) {
6323 printUsageAndExit(null);
6324 }
6325 boolean majorCompact = false;
6326 if (args.length > 1) {
6327 if (!args[1].toLowerCase().startsWith("major")) {
6328 printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6329 }
6330 majorCompact = true;
6331 }
6332 final Path tableDir = new Path(args[0]);
6333 final Configuration c = HBaseConfiguration.create();
6334 final FileSystem fs = FileSystem.get(c);
6335 final Path logdir = new Path(c.get("hbase.tmp.dir"));
6336 final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6337
6338 final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
6339 try {
6340 processTable(fs, tableDir, log, c, majorCompact);
6341 } finally {
6342 log.close();
6343
6344 BlockCache bc = new CacheConfig(c).getBlockCache();
6345 if (bc != null) bc.shutdown();
6346 }
6347 }
6348
6349
6350
6351
6352 public long getOpenSeqNum() {
6353 return this.openSeqNum;
6354 }
6355
6356
6357
6358
6359
6360 public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6361 return this.maxSeqIdInStores;
6362 }
6363
6364
6365
6366
6367 public CompactionState getCompactionState() {
6368 boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6369 return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6370 : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6371 }
6372
6373 public void reportCompactionRequestStart(boolean isMajor){
6374 (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6375 }
6376
6377 public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted){
6378 int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6379
6380
6381 compactionsFinished.incrementAndGet();
6382 compactionNumFilesCompacted.addAndGet(numFiles);
6383 compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6384
6385 assert newValue >= 0;
6386 }
6387
6388
6389
6390
6391 public AtomicLong getSequenceId() {
6392 return this.sequenceId;
6393 }
6394
6395
6396
6397
6398
6399 private void setSequenceId(long value) {
6400 this.sequenceId.set(value);
6401 }
6402
6403
6404
6405
6406
6407
6408 public interface BulkLoadListener {
6409
6410
6411
6412
6413
6414
6415
6416
6417 String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6418
6419
6420
6421
6422
6423
6424
6425 void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6426
6427
6428
6429
6430
6431
6432
6433 void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6434 }
6435
6436 @VisibleForTesting class RowLockContext {
6437 private final HashedBytes row;
6438 private final CountDownLatch latch = new CountDownLatch(1);
6439 private final Thread thread;
6440 private int lockCount = 0;
6441
6442 RowLockContext(HashedBytes row) {
6443 this.row = row;
6444 this.thread = Thread.currentThread();
6445 }
6446
6447 boolean ownedByCurrentThread() {
6448 return thread == Thread.currentThread();
6449 }
6450
6451 RowLock newLock() {
6452 lockCount++;
6453 return new RowLock(this);
6454 }
6455
6456 void releaseLock() {
6457 if (!ownedByCurrentThread()) {
6458 throw new IllegalArgumentException("Lock held by thread: " + thread
6459 + " cannot be released by different thread: " + Thread.currentThread());
6460 }
6461 lockCount--;
6462 if (lockCount == 0) {
6463
6464 RowLockContext existingContext = lockedRows.remove(row);
6465 if (existingContext != this) {
6466 throw new RuntimeException(
6467 "Internal row lock state inconsistent, should not happen, row: " + row);
6468 }
6469 latch.countDown();
6470 }
6471 }
6472 }
6473
6474
6475
6476
6477
6478
6479 public static class RowLock {
6480 @VisibleForTesting final RowLockContext context;
6481 private boolean released = false;
6482
6483 @VisibleForTesting RowLock(RowLockContext context) {
6484 this.context = context;
6485 }
6486
6487
6488
6489
6490
6491
6492 public void release() {
6493 if (!released) {
6494 context.releaseLock();
6495 released = true;
6496 }
6497 }
6498 }
6499
6500
6501
6502
6503
6504
6505 public void updatesLock() throws RegionTooBusyException, InterruptedIOException {
6506 lock(updatesLock.readLock());
6507 }
6508
6509
6510
6511
6512
6513 public void updatesUnlock() throws InterruptedIOException {
6514 updatesLock.readLock().unlock();
6515 }
6516 }