View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.PriorityBlockingQueue;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.RemoteExceptionHandler;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
42  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
43  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
44  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
45  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46  import org.apache.hadoop.hbase.util.Pair;
47  import org.apache.hadoop.util.StringUtils;
48  
49  import com.google.common.annotations.VisibleForTesting;
50  import com.google.common.base.Preconditions;
51  
52  /**
53   * Compact region on request and then run split if appropriate
54   */
55  @InterfaceAudience.Private
56  public class CompactSplitThread implements CompactionRequestor {
57    static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
58  
59    public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
60        "hbase.regionserver.regionSplitLimit";
61    public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
62    
63    private final HRegionServer server;
64    private final Configuration conf;
65  
66    private final ThreadPoolExecutor largeCompactions;
67    private final ThreadPoolExecutor smallCompactions;
68    private final ThreadPoolExecutor splits;
69    private final ThreadPoolExecutor mergePool;
70  
71    private final CompactionThroughputController compactionThroughputController;
72  
73    /**
74     * Splitting should not take place if the total number of regions exceed this.
75     * This is not a hard limit to the number of regions but it is a guideline to
76     * stop splitting after number of online regions is greater than this.
77     */
78    private int regionSplitLimit;
79  
80    /** @param server */
81    CompactSplitThread(HRegionServer server) {
82      super();
83      this.server = server;
84      this.conf = server.getConfiguration();
85      this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
86          DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
87  
88      int largeThreads = Math.max(1, conf.getInt(
89          "hbase.regionserver.thread.compaction.large", 1));
90      int smallThreads = conf.getInt(
91          "hbase.regionserver.thread.compaction.small", 1);
92  
93      int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
94  
95      // if we have throttle threads, make sure the user also specified size
96      Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
97  
98      final String n = Thread.currentThread().getName();
99  
100     this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
101         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
102         new ThreadFactory() {
103           @Override
104           public Thread newThread(Runnable r) {
105             Thread t = new Thread(r);
106             t.setName(n + "-largeCompactions-" + System.currentTimeMillis());
107             return t;
108           }
109       });
110     this.largeCompactions.setRejectedExecutionHandler(new Rejection());
111     this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
112         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
113         new ThreadFactory() {
114           @Override
115           public Thread newThread(Runnable r) {
116             Thread t = new Thread(r);
117             t.setName(n + "-smallCompactions-" + System.currentTimeMillis());
118             return t;
119           }
120       });
121     this.smallCompactions
122         .setRejectedExecutionHandler(new Rejection());
123     this.splits = (ThreadPoolExecutor)
124         Executors.newFixedThreadPool(splitThreads,
125             new ThreadFactory() {
126           @Override
127           public Thread newThread(Runnable r) {
128             Thread t = new Thread(r);
129             t.setName(n + "-splits-" + System.currentTimeMillis());
130             return t;
131           }
132       });
133     int mergeThreads = conf.getInt("hbase.regionserver.thread.merge", 1);
134     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
135         mergeThreads, new ThreadFactory() {
136           @Override
137           public Thread newThread(Runnable r) {
138             Thread t = new Thread(r);
139             t.setName(n + "-merges-" + System.currentTimeMillis());
140             return t;
141           }
142         });
143 
144     // compaction throughput controller
145     this.compactionThroughputController =
146         CompactionThroughputControllerFactory.create(server, conf);
147   }
148 
149   @Override
150   public String toString() {
151     return "compaction_queue=("
152         + largeCompactions.getQueue().size() + ":"
153         + smallCompactions.getQueue().size() + ")"
154         + ", split_queue=" + splits.getQueue().size()
155         + ", merge_queue=" + mergePool.getQueue().size();
156   }
157   
158   public String dumpQueue() {
159     StringBuffer queueLists = new StringBuffer();
160     queueLists.append("Compaction/Split Queue dump:\n");
161     queueLists.append("  LargeCompation Queue:\n");
162     BlockingQueue<Runnable> lq = largeCompactions.getQueue();
163     Iterator<Runnable> it = lq.iterator();
164     while (it.hasNext()) {
165       queueLists.append("    " + it.next().toString());
166       queueLists.append("\n");
167     }
168 
169     if (smallCompactions != null) {
170       queueLists.append("\n");
171       queueLists.append("  SmallCompation Queue:\n");
172       lq = smallCompactions.getQueue();
173       it = lq.iterator();
174       while (it.hasNext()) {
175         queueLists.append("    " + it.next().toString());
176         queueLists.append("\n");
177       }
178     }
179 
180     queueLists.append("\n");
181     queueLists.append("  Split Queue:\n");
182     lq = splits.getQueue();
183     it = lq.iterator();
184     while (it.hasNext()) {
185       queueLists.append("    " + it.next().toString());
186       queueLists.append("\n");
187     }
188 
189     queueLists.append("\n");
190     queueLists.append("  Region Merge Queue:\n");
191     lq = mergePool.getQueue();
192     it = lq.iterator();
193     while (it.hasNext()) {
194       queueLists.append("    " + it.next().toString());
195       queueLists.append("\n");
196     }
197 
198     return queueLists.toString();
199   }
200 
201   public synchronized void requestRegionsMerge(final HRegion a,
202       final HRegion b, final boolean forcible) {
203     try {
204       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible));
205       if (LOG.isDebugEnabled()) {
206         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
207             + forcible + ".  " + this);
208       }
209     } catch (RejectedExecutionException ree) {
210       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
211           + forcible, ree);
212     }
213   }
214 
215   public synchronized boolean requestSplit(final HRegion r) {
216     // don't split regions that are blocking
217     if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
218       byte[] midKey = r.checkSplit();
219       if (midKey != null) {
220         requestSplit(r, midKey);
221         return true;
222       }
223     }
224     return false;
225   }
226 
227   public synchronized void requestSplit(final HRegion r, byte[] midKey) {
228     if (midKey == null) {
229       LOG.debug("Region " + r.getRegionNameAsString() +
230         " not splittable because midkey=null");
231       if (r.shouldForceSplit()) {
232         r.clearSplit();
233       }
234       return;
235     }
236     try {
237       this.splits.execute(new SplitRequest(r, midKey, this.server));
238       if (LOG.isDebugEnabled()) {
239         LOG.debug("Split requested for " + r + ".  " + this);
240       }
241     } catch (RejectedExecutionException ree) {
242       LOG.info("Could not execute split for " + r, ree);
243     }
244   }
245 
246   @Override
247   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why)
248       throws IOException {
249     return requestCompaction(r, why, null);
250   }
251 
252   @Override
253   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
254       List<Pair<CompactionRequest, Store>> requests) throws IOException {
255     return requestCompaction(r, why, Store.NO_PRIORITY, requests);
256   }
257 
258   @Override
259   public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
260       final String why, CompactionRequest request) throws IOException {
261     return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
262   }
263 
264   @Override
265   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
266       int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
267     return requestCompactionInternal(r, why, p, requests, true);
268   }
269 
270   private List<CompactionRequest> requestCompactionInternal(final HRegion r, final String why,
271       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
272     // not a special compaction request, so make our own list
273     List<CompactionRequest> ret = null;
274     if (requests == null) {
275       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
276       for (Store s : r.getStores().values()) {
277         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
278         if (selectNow) ret.add(cr);
279       }
280     } else {
281       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
282       ret = new ArrayList<CompactionRequest>(requests.size());
283       for (Pair<CompactionRequest, Store> pair : requests) {
284         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
285       }
286     }
287     return ret;
288   }
289 
290   public CompactionRequest requestCompaction(final HRegion r, final Store s,
291       final String why, int priority, CompactionRequest request) throws IOException {
292     return requestCompactionInternal(r, s, why, priority, request, true);
293   }
294 
295   public synchronized void requestSystemCompaction(
296       final HRegion r, final String why) throws IOException {
297     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
298   }
299 
300   public void requestSystemCompaction(
301       final HRegion r, final Store s, final String why) throws IOException {
302     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
303   }
304 
305   /**
306    * @param r HRegion store belongs to
307    * @param s Store to request compaction on
308    * @param why Why compaction requested -- used in debug messages
309    * @param priority override the default priority (NO_PRIORITY == decide)
310    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
311    *          compaction will be used.
312    */
313   private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
314       final String why, int priority, CompactionRequest request, boolean selectNow)
315           throws IOException {
316     if (this.server.isStopped()
317         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
318       return null;
319     }
320 
321     CompactionContext compaction = null;
322     if (selectNow) {
323       compaction = selectCompaction(r, s, priority, request);
324       if (compaction == null) return null; // message logged inside
325     }
326 
327     // We assume that most compactions are small. So, put system compactions into small
328     // pool; we will do selection there, and move to large pool if necessary.
329     long size = selectNow ? compaction.getRequest().getSize() : 0;
330     ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
331       ? largeCompactions : smallCompactions;
332     pool.execute(new CompactionRunner(s, r, compaction, pool));
333     if (LOG.isDebugEnabled()) {
334       String type = (pool == smallCompactions) ? "Small " : "Large ";
335       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
336           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
337     }
338     return selectNow ? compaction.getRequest() : null;
339   }
340 
341   private CompactionContext selectCompaction(final HRegion r, final Store s,
342       int priority, CompactionRequest request) throws IOException {
343     CompactionContext compaction = s.requestCompaction(priority, request);
344     if (compaction == null) {
345       if(LOG.isDebugEnabled()) {
346         LOG.debug("Not compacting " + r.getRegionNameAsString() +
347             " because compaction request was cancelled");
348       }
349       return null;
350     }
351     assert compaction.hasSelection();
352     if (priority != Store.NO_PRIORITY) {
353       compaction.getRequest().setPriority(priority);
354     }
355     return compaction;
356   }
357 
358   /**
359    * Only interrupt once it's done with a run through the work loop.
360    */
361   void interruptIfNecessary() {
362     splits.shutdown();
363     mergePool.shutdown();
364     largeCompactions.shutdown();
365     smallCompactions.shutdown();
366   }
367 
368   private void waitFor(ThreadPoolExecutor t, String name) {
369     boolean done = false;
370     while (!done) {
371       try {
372         done = t.awaitTermination(60, TimeUnit.SECONDS);
373         LOG.info("Waiting for " + name + " to finish...");
374         if (!done) {
375           t.shutdownNow();
376         }
377       } catch (InterruptedException ie) {
378         LOG.warn("Interrupted waiting for " + name + " to finish...");
379       }
380     }
381   }
382 
383   void join() {
384     waitFor(splits, "Split Thread");
385     waitFor(mergePool, "Merge Thread");
386     waitFor(largeCompactions, "Large Compaction Thread");
387     waitFor(smallCompactions, "Small Compaction Thread");
388   }
389 
390   /**
391    * Returns the current size of the queue containing regions that are
392    * processed.
393    *
394    * @return The current size of the regions queue.
395    */
396   public int getCompactionQueueSize() {
397     return largeCompactions.getQueue().size() + smallCompactions.getQueue().size();
398   }
399 
400   public int getLargeCompactionQueueSize() {
401     return largeCompactions.getQueue().size();
402   }
403 
404 
405   public int getSmallCompactionQueueSize() {
406     return smallCompactions.getQueue().size();
407   }
408 
409   public int getSplitQueueSize() {
410     return splits.getQueue().size();
411   }
412 
413   private boolean shouldSplitRegion() {
414     if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
415       LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
416           + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
417     }
418     return (regionSplitLimit > server.getNumberOfOnlineRegions());
419   }
420 
421   /**
422    * @return the regionSplitLimit
423    */
424   public int getRegionSplitLimit() {
425     return this.regionSplitLimit;
426   }
427 
428   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
429     private final Store store;
430     private final HRegion region;
431     private CompactionContext compaction;
432     private int queuedPriority;
433     private ThreadPoolExecutor parent;
434 
435     public CompactionRunner(Store store, HRegion region,
436         CompactionContext compaction, ThreadPoolExecutor parent) {
437       super();
438       this.store = store;
439       this.region = region;
440       this.compaction = compaction;
441       this.queuedPriority = (this.compaction == null)
442           ? store.getCompactPriority() : compaction.getRequest().getPriority();
443       this.parent = parent;
444     }
445 
446     @Override
447     public String toString() {
448       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
449           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
450     }
451 
452     @Override
453     public void run() {
454       Preconditions.checkNotNull(server);
455       if (server.isStopped()
456           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
457         return;
458       }
459       // Common case - system compaction without a file selection. Select now.
460       if (this.compaction == null) {
461         int oldPriority = this.queuedPriority;
462         this.queuedPriority = this.store.getCompactPriority();
463         if (this.queuedPriority > oldPriority) {
464           // Store priority decreased while we were in queue (due to some other compaction?),
465           // requeue with new priority to avoid blocking potential higher priorities.
466           this.parent.execute(this);
467           return;
468         }
469         try {
470           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
471         } catch (IOException ex) {
472           LOG.error("Compaction selection failed " + this, ex);
473           server.checkFileSystem();
474           return;
475         }
476         if (this.compaction == null) return; // nothing to do
477         // Now see if we are in correct pool for the size; if not, go to the correct one.
478         // We might end up waiting for a while, so cancel the selection.
479         assert this.compaction.hasSelection();
480         ThreadPoolExecutor pool = store.throttleCompaction(
481             compaction.getRequest().getSize()) ? largeCompactions : smallCompactions;
482         if (this.parent != pool) {
483           this.store.cancelRequestedCompaction(this.compaction);
484           this.compaction = null;
485           this.parent = pool;
486           this.parent.execute(this);
487           return;
488         }
489       }
490       // Finally we can compact something.
491       assert this.compaction != null;
492 
493       this.compaction.getRequest().beforeExecute();
494       try {
495         // Note: please don't put single-compaction logic here;
496         //       put it into region/store/etc. This is CST logic.
497         long start = EnvironmentEdgeManager.currentTimeMillis();
498         boolean completed = region.compact(compaction, store, compactionThroughputController);
499         long now = EnvironmentEdgeManager.currentTimeMillis();
500         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
501               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
502         if (completed) {
503           // degenerate case: blocked regions require recursive enqueues
504           if (store.getCompactPriority() <= 0) {
505             requestSystemCompaction(region, store, "Recursive enqueue");
506           } else {
507             // see if the compaction has caused us to exceed max region size
508             requestSplit(region);
509           }
510         }
511       } catch (IOException ex) {
512         IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
513         LOG.error("Compaction failed " + this, remoteEx);
514         if (remoteEx != ex) {
515           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
516         }
517         server.checkFileSystem();
518       } catch (Exception ex) {
519         LOG.error("Compaction failed " + this, ex);
520         server.checkFileSystem();
521       } finally {
522         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
523       }
524       this.compaction.getRequest().afterExecute();
525     }
526 
527     private String formatStackTrace(Exception ex) {
528       StringWriter sw = new StringWriter();
529       PrintWriter pw = new PrintWriter(sw);
530       ex.printStackTrace(pw);
531       pw.flush();
532       return sw.toString();
533     }
534 
535     @Override
536     public int compareTo(CompactionRunner o) {
537       // Only compare the underlying request (if any), for queue sorting purposes.
538       int compareVal = queuedPriority - o.queuedPriority; // compare priority
539       if (compareVal != 0) return compareVal;
540       CompactionContext tc = this.compaction, oc = o.compaction;
541       // Sort pre-selected (user?) compactions before system ones with equal priority.
542       return (tc == null) ? ((oc == null) ? 0 : 1)
543           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
544     }
545   }
546 
547   /**
548    * Cleanup class to use when rejecting a compaction request from the queue.
549    */
550   private static class Rejection implements RejectedExecutionHandler {
551     @Override
552     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
553       if (runnable instanceof CompactionRunner) {
554         CompactionRunner runner = (CompactionRunner)runnable;
555         LOG.debug("Compaction Rejected: " + runner);
556         runner.store.cancelRequestedCompaction(runner.compaction);
557       }
558     }
559   }
560 
561   @VisibleForTesting
562   public CompactionThroughputController getCompactionThroughputController() {
563     return compactionThroughputController;
564   }
565 
566 }