1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.lang.Thread.UncaughtExceptionHandler;
23 import java.lang.management.ManagementFactory;
24 import java.util.ConcurrentModificationException;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.SortedMap;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.DelayQueue;
32 import java.util.concurrent.Delayed;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.locks.ReentrantReadWriteLock;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.DroppedSnapshotException;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.RemoteExceptionHandler;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47 import org.apache.hadoop.hbase.util.HasThread;
48 import org.apache.hadoop.hbase.util.Threads;
49 import org.apache.hadoop.util.StringUtils;
50 import org.cliffc.high_scale_lib.Counter;
51
52 import com.google.common.base.Preconditions;
53 import org.cloudera.htrace.Trace;
54 import org.cloudera.htrace.TraceScope;
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 class MemStoreFlusher implements FlushRequester {
67 static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
68
69
70 private final BlockingQueue<FlushQueueEntry> flushQueue =
71 new DelayQueue<FlushQueueEntry>();
72 private final Map<HRegion, FlushRegionEntry> regionsInQueue =
73 new HashMap<HRegion, FlushRegionEntry>();
74 private AtomicBoolean wakeupPending = new AtomicBoolean();
75
76 private final long threadWakeFrequency;
77 private final HRegionServer server;
78 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
79 private final Object blockSignal = new Object();
80
81 protected final long globalMemStoreLimit;
82 protected final long globalMemStoreLimitLowMark;
83
84 static final float DEFAULT_UPPER = 0.4f;
85 private static final float DEFAULT_LOWER = 0.35f;
86 static final String UPPER_KEY =
87 "hbase.regionserver.global.memstore.upperLimit";
88 private static final String LOWER_KEY =
89 "hbase.regionserver.global.memstore.lowerLimit";
90 private long blockingWaitTime;
91 private final Counter updatesBlockedMsHighWater = new Counter();
92
93 private final FlushHandler[] flushHandlers;
94
95
96
97
98
99 public MemStoreFlusher(final Configuration conf,
100 final HRegionServer server) {
101 super();
102 this.server = server;
103 this.threadWakeFrequency =
104 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
105 long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
106 this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
107 UPPER_KEY, conf);
108 long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
109 if (lower > this.globalMemStoreLimit) {
110 lower = this.globalMemStoreLimit;
111 LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " +
112 "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
113 }
114 this.globalMemStoreLimitLowMark = lower;
115 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
116 90000);
117 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
118 this.flushHandlers = new FlushHandler[handlerCount];
119 LOG.info("globalMemStoreLimit=" +
120 StringUtils.humanReadableInt(this.globalMemStoreLimit) +
121 ", globalMemStoreLimitLowMark=" +
122 StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
123 ", maxHeap=" + StringUtils.humanReadableInt(max));
124 }
125
126
127
128
129
130
131
132
133
134
135 static long globalMemStoreLimit(final long max,
136 final float defaultLimit, final String key, final Configuration c) {
137 float limit = c.getFloat(key, defaultLimit);
138 return getMemStoreLimit(max, limit, defaultLimit);
139 }
140
141 static long getMemStoreLimit(final long max, final float limit,
142 final float defaultLimit) {
143 float effectiveLimit = limit;
144 if (limit >= 0.8f || limit <= 0f) {
145 LOG.warn("Setting global memstore limit to default of " + defaultLimit +
146 " because supplied value outside allowed range of (0 -> 0.8]");
147 effectiveLimit = defaultLimit;
148 }
149 return (long)(max * effectiveLimit);
150 }
151
152 public Counter getUpdatesBlockedMsHighWater() {
153 return this.updatesBlockedMsHighWater;
154 }
155
156
157
158
159
160
161
162 private boolean flushOneForGlobalPressure() {
163 SortedMap<Long, HRegion> regionsBySize =
164 server.getCopyOfOnlineRegionsSortedBySize();
165
166 Set<HRegion> excludedRegions = new HashSet<HRegion>();
167
168 boolean flushedOne = false;
169 while (!flushedOne) {
170
171
172 HRegion bestFlushableRegion = getBiggestMemstoreRegion(
173 regionsBySize, excludedRegions, true);
174
175 HRegion bestAnyRegion = getBiggestMemstoreRegion(
176 regionsBySize, excludedRegions, false);
177
178 if (bestAnyRegion == null) {
179 LOG.error("Above memory mark but there are no flushable regions!");
180 return false;
181 }
182
183 HRegion regionToFlush;
184 if (bestFlushableRegion != null &&
185 bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
186
187
188
189
190 if (LOG.isDebugEnabled()) {
191 LOG.debug("Under global heap pressure: " +
192 "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
193 "store files, but is " +
194 StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
195 " vs best flushable region's " +
196 StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
197 ". Choosing the bigger.");
198 }
199 regionToFlush = bestAnyRegion;
200 } else {
201 if (bestFlushableRegion == null) {
202 regionToFlush = bestAnyRegion;
203 } else {
204 regionToFlush = bestFlushableRegion;
205 }
206 }
207
208 Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
209
210 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
211 flushedOne = flushRegion(regionToFlush, true);
212 if (!flushedOne) {
213 LOG.info("Excluding unflushable region " + regionToFlush +
214 " - trying to find a different region to flush.");
215 excludedRegions.add(regionToFlush);
216 }
217 }
218 return true;
219 }
220
221 private class FlushHandler extends HasThread {
222
223 private FlushHandler(String name) {
224 super(name);
225 }
226
227 @Override
228 public void run() {
229 while (!server.isStopped()) {
230 FlushQueueEntry fqe = null;
231 try {
232 wakeupPending.set(false);
233 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
234 if (fqe == null || fqe instanceof WakeupFlushThread) {
235 if (isAboveLowWaterMark()) {
236 LOG.debug("Flush thread woke up because memory above low water="
237 + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
238 if (!flushOneForGlobalPressure()) {
239
240
241
242
243
244 Thread.sleep(1000);
245 wakeUpIfBlocking();
246 }
247
248 wakeupFlushThread();
249 }
250 continue;
251 }
252 FlushRegionEntry fre = (FlushRegionEntry) fqe;
253 if (!flushRegion(fre)) {
254 break;
255 }
256 } catch (InterruptedException ex) {
257 continue;
258 } catch (ConcurrentModificationException ex) {
259 continue;
260 } catch (Exception ex) {
261 LOG.error("Cache flusher failed for entry " + fqe, ex);
262 if (!server.checkFileSystem()) {
263 break;
264 }
265 }
266 }
267 synchronized (regionsInQueue) {
268 regionsInQueue.clear();
269 flushQueue.clear();
270 }
271
272
273 wakeUpIfBlocking();
274 LOG.info(getName() + " exiting");
275 }
276 }
277
278
279 private void wakeupFlushThread() {
280 if (wakeupPending.compareAndSet(false, true)) {
281 flushQueue.add(new WakeupFlushThread());
282 }
283 }
284
285 private HRegion getBiggestMemstoreRegion(
286 SortedMap<Long, HRegion> regionsBySize,
287 Set<HRegion> excludedRegions,
288 boolean checkStoreFileCount) {
289 synchronized (regionsInQueue) {
290 for (HRegion region : regionsBySize.values()) {
291 if (excludedRegions.contains(region)) {
292 continue;
293 }
294
295 if (region.writestate.flushing || !region.writestate.writesEnabled) {
296 continue;
297 }
298
299 if (checkStoreFileCount && isTooManyStoreFiles(region)) {
300 continue;
301 }
302 return region;
303 }
304 }
305 return null;
306 }
307
308
309
310
311 private boolean isAboveHighWaterMark() {
312 return server.getRegionServerAccounting().
313 getGlobalMemstoreSize() >= globalMemStoreLimit;
314 }
315
316
317
318
319 private boolean isAboveLowWaterMark() {
320 return server.getRegionServerAccounting().
321 getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
322 }
323
324 public void requestFlush(HRegion r) {
325 synchronized (regionsInQueue) {
326 if (!regionsInQueue.containsKey(r)) {
327
328
329 FlushRegionEntry fqe = new FlushRegionEntry(r);
330 this.regionsInQueue.put(r, fqe);
331 this.flushQueue.add(fqe);
332 }
333 }
334 }
335
336 public void requestDelayedFlush(HRegion r, long delay) {
337 synchronized (regionsInQueue) {
338 if (!regionsInQueue.containsKey(r)) {
339
340 FlushRegionEntry fqe = new FlushRegionEntry(r);
341 fqe.requeue(delay);
342 this.regionsInQueue.put(r, fqe);
343 this.flushQueue.add(fqe);
344 }
345 }
346 }
347
348 public int getFlushQueueSize() {
349 return flushQueue.size();
350 }
351
352
353
354
355 void interruptIfNecessary() {
356 lock.writeLock().lock();
357 try {
358 for (FlushHandler flushHander : flushHandlers) {
359 if (flushHander != null) flushHander.interrupt();
360 }
361 } finally {
362 lock.writeLock().unlock();
363 }
364 }
365
366 synchronized void start(UncaughtExceptionHandler eh) {
367 ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
368 server.getServerName().toShortString() + "-MemStoreFlusher", eh);
369 for (int i = 0; i < flushHandlers.length; i++) {
370 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
371 flusherThreadFactory.newThread(flushHandlers[i]);
372 flushHandlers[i].start();
373 }
374 }
375
376 boolean isAlive() {
377 for (FlushHandler flushHander : flushHandlers) {
378 if (flushHander != null && flushHander.isAlive()) {
379 return true;
380 }
381 }
382 return false;
383 }
384
385 void join() {
386 for (FlushHandler flushHander : flushHandlers) {
387 if (flushHander != null) {
388 Threads.shutdown(flushHander.getThread());
389 }
390 }
391 }
392
393
394
395
396
397
398
399
400
401 private boolean flushRegion(final FlushRegionEntry fqe) {
402 HRegion region = fqe.region;
403 if (!region.getRegionInfo().isMetaRegion() &&
404 isTooManyStoreFiles(region)) {
405 if (fqe.isMaximumWait(this.blockingWaitTime)) {
406 LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
407 "ms on a compaction to clean up 'too many store files'; waited " +
408 "long enough... proceeding with flush of " +
409 region.getRegionNameAsString());
410 } else {
411
412 if (fqe.getRequeueCount() <= 0) {
413
414 LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
415 "store files; delaying flush up to " + this.blockingWaitTime + "ms");
416 if (!this.server.compactSplitThread.requestSplit(region)) {
417 try {
418 this.server.compactSplitThread.requestSystemCompaction(
419 region, Thread.currentThread().getName());
420 } catch (IOException e) {
421 LOG.error(
422 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
423 RemoteExceptionHandler.checkIOException(e));
424 }
425 }
426 }
427
428
429
430 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
431
432 return true;
433 }
434 }
435 return flushRegion(region, false);
436 }
437
438
439
440
441
442
443
444
445
446
447
448
449
450 private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
451 long startTime = 0;
452 synchronized (this.regionsInQueue) {
453 FlushRegionEntry fqe = this.regionsInQueue.remove(region);
454
455 if (fqe != null) {
456 startTime = fqe.createTime;
457 }
458 if (fqe != null && emergencyFlush) {
459
460
461 flushQueue.remove(fqe);
462 }
463 }
464 if (startTime == 0) {
465
466
467
468 startTime = EnvironmentEdgeManager.currentTimeMillis();
469 }
470 lock.readLock().lock();
471 try {
472 HRegion.FlushResult flushResult = region.flushcache();
473 boolean shouldCompact = flushResult.isCompactionNeeded();
474
475 boolean shouldSplit = region.checkSplit() != null;
476 if (shouldSplit) {
477 this.server.compactSplitThread.requestSplit(region);
478 } else if (shouldCompact) {
479 server.compactSplitThread.requestSystemCompaction(
480 region, Thread.currentThread().getName());
481 }
482 if (flushResult.isFlushSucceeded()) {
483 long endTime = EnvironmentEdgeManager.currentTimeMillis();
484 server.getMetrics().updateFlushTime(endTime - startTime);
485 }
486 } catch (DroppedSnapshotException ex) {
487
488
489
490
491
492 server.abort("Replay of HLog required. Forcing server shutdown", ex);
493 return false;
494 } catch (IOException ex) {
495 LOG.error("Cache flush failed" +
496 (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
497 RemoteExceptionHandler.checkIOException(ex));
498 if (!server.checkFileSystem()) {
499 return false;
500 }
501 } finally {
502 lock.readLock().unlock();
503 wakeUpIfBlocking();
504 }
505 return true;
506 }
507
508 private void wakeUpIfBlocking() {
509 synchronized (blockSignal) {
510 blockSignal.notifyAll();
511 }
512 }
513
514 private boolean isTooManyStoreFiles(HRegion region) {
515 for (Store store : region.stores.values()) {
516 if (store.hasTooManyStoreFiles()) {
517 return true;
518 }
519 }
520 return false;
521 }
522
523
524
525
526
527
528
529 public void reclaimMemStoreMemory() {
530 TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
531 if (isAboveHighWaterMark()) {
532 if (Trace.isTracing()) {
533 scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
534 }
535 long start = System.currentTimeMillis();
536 synchronized (this.blockSignal) {
537 boolean blocked = false;
538 long startTime = 0;
539 while (isAboveHighWaterMark() && !server.isStopped()) {
540 if (!blocked) {
541 startTime = EnvironmentEdgeManager.currentTimeMillis();
542 LOG.info("Blocking updates on " + server.toString() +
543 ": the global memstore size " +
544 StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
545 " is >= than blocking " +
546 StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
547 }
548 blocked = true;
549 wakeupFlushThread();
550 try {
551
552
553 blockSignal.wait(5 * 1000);
554 } catch (InterruptedException ie) {
555 Thread.currentThread().interrupt();
556 }
557 long took = System.currentTimeMillis() - start;
558 LOG.warn("Memstore is above high water mark and block " + took + "ms");
559 }
560 if(blocked){
561 final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
562 if(totalTime > 0){
563 this.updatesBlockedMsHighWater.add(totalTime);
564 }
565 LOG.info("Unblocking updates for server " + server.toString());
566 }
567 }
568 } else if (isAboveLowWaterMark()) {
569 wakeupFlushThread();
570 }
571 scope.close();
572 }
573 @Override
574 public String toString() {
575 return "flush_queue="
576 + flushQueue.size();
577 }
578
579 public String dumpQueue() {
580 StringBuilder queueList = new StringBuilder();
581 queueList.append("Flush Queue Queue dump:\n");
582 queueList.append(" Flush Queue:\n");
583 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
584
585 while(it.hasNext()){
586 queueList.append(" "+it.next().toString());
587 queueList.append("\n");
588 }
589
590 return queueList.toString();
591 }
592
593 interface FlushQueueEntry extends Delayed {
594 }
595
596
597
598
599 static class WakeupFlushThread implements FlushQueueEntry {
600 @Override
601 public long getDelay(TimeUnit unit) {
602 return 0;
603 }
604
605 @Override
606 public int compareTo(Delayed o) {
607 return -1;
608 }
609
610 @Override
611 public boolean equals(Object obj) {
612 return (this == obj);
613 }
614 }
615
616
617
618
619
620
621
622
623
624 static class FlushRegionEntry implements FlushQueueEntry {
625 private final HRegion region;
626
627 private final long createTime;
628 private long whenToExpire;
629 private int requeueCount = 0;
630
631 FlushRegionEntry(final HRegion r) {
632 this.region = r;
633 this.createTime = System.currentTimeMillis();
634 this.whenToExpire = this.createTime;
635 }
636
637
638
639
640
641 public boolean isMaximumWait(final long maximumWait) {
642 return (System.currentTimeMillis() - this.createTime) > maximumWait;
643 }
644
645
646
647
648
649 public int getRequeueCount() {
650 return this.requeueCount;
651 }
652
653
654
655
656
657
658
659 public FlushRegionEntry requeue(final long when) {
660 this.whenToExpire = System.currentTimeMillis() + when;
661 this.requeueCount++;
662 return this;
663 }
664
665 @Override
666 public long getDelay(TimeUnit unit) {
667 return unit.convert(this.whenToExpire - System.currentTimeMillis(),
668 TimeUnit.MILLISECONDS);
669 }
670
671 @Override
672 public int compareTo(Delayed other) {
673
674 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
675 other.getDelay(TimeUnit.MILLISECONDS)).intValue();
676 if (ret != 0) {
677 return ret;
678 }
679 FlushQueueEntry otherEntry = (FlushQueueEntry) other;
680 return hashCode() - otherEntry.hashCode();
681 }
682
683 @Override
684 public String toString() {
685 return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
686 }
687
688 @Override
689 public int hashCode() {
690 int hash = (int) getDelay(TimeUnit.MILLISECONDS);
691 return hash ^ region.hashCode();
692 }
693
694 @Override
695 public boolean equals(Object obj) {
696 if (this == obj) {
697 return true;
698 }
699 if (obj == null || getClass() != obj.getClass()) {
700 return false;
701 }
702 Delayed other = (Delayed) obj;
703 return compareTo(other) == 0;
704 }
705 }
706 }