View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.balancer;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableMap;
27  import java.util.Random;
28  import java.util.TreeMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.ServerName;
36  import org.apache.hadoop.hbase.master.AssignmentManager;
37  import org.apache.hadoop.hbase.master.RegionPlan;
38  
39  import com.google.common.collect.MinMaxPriorityQueue;
40  
41  /**
42   * Makes decisions about the placement and movement of Regions across
43   * RegionServers.
44   *
45   * <p>Cluster-wide load balancing will occur only when there are no regions in
46   * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
47   *
48   * <p>Inline region placement with {@link #immediateAssignment} can be used when
49   * the Master needs to handle closed regions that it currently does not have
50   * a destination set for.  This can happen during master failover.
51   *
52   * <p>On cluster startup, bulk assignment can be used to determine
53   * locations for all Regions in a cluster.
54   *
55   * <p>This classes produces plans for the {@link AssignmentManager} to execute.
56   */
57  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
58  public class SimpleLoadBalancer extends BaseLoadBalancer {
59    private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class);
60    private static final Random RANDOM = new Random(System.currentTimeMillis());
61  
62    private RegionInfoComparator riComparator = new RegionInfoComparator();
63    private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
64  
65  
66    /**
67     * Stores additional per-server information about the regions added/removed
68     * during the run of the balancing algorithm.
69     *
70     * For servers that shed regions, we need to track which regions we have already
71     * shed. <b>nextRegionForUnload</b> contains the index in the list of regions on
72     * the server that is the next to be shed.
73     */
74    static class BalanceInfo {
75  
76      private final int nextRegionForUnload;
77      private int numRegionsAdded;
78  
79      public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
80        this.nextRegionForUnload = nextRegionForUnload;
81        this.numRegionsAdded = numRegionsAdded;
82      }
83  
84      int getNextRegionForUnload() {
85        return nextRegionForUnload;
86      }
87  
88      int getNumRegionsAdded() {
89        return numRegionsAdded;
90      }
91  
92      void setNumRegionsAdded(int numAdded) {
93        this.numRegionsAdded = numAdded;
94      }
95    }
96  
97    /**
98     * Generate a global load balancing plan according to the specified map of
99     * server information to the most loaded regions of each server.
100    *
101    * The load balancing invariant is that all servers are within 1 region of the
102    * average number of regions per server.  If the average is an integer number,
103    * all servers will be balanced to the average.  Otherwise, all servers will
104    * have either floor(average) or ceiling(average) regions.
105    *
106    * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
107    *   we can fetch from both ends of the queue. 
108    * At the beginning, we check whether there was empty region server 
109    *   just discovered by Master. If so, we alternately choose new / old
110    *   regions from head / tail of regionsToMove, respectively. This alternation
111    *   avoids clustering young regions on the newly discovered region server.
112    *   Otherwise, we choose new regions from head of regionsToMove.
113    *   
114    * Another improvement from HBASE-3609 is that we assign regions from
115    *   regionsToMove to underloaded servers in round-robin fashion.
116    *   Previously one underloaded server would be filled before we move onto
117    *   the next underloaded server, leading to clustering of young regions.
118    *   
119    * Finally, we randomly shuffle underloaded servers so that they receive
120    *   offloaded regions relatively evenly across calls to balanceCluster().
121    *         
122    * The algorithm is currently implemented as such:
123    *
124    * <ol>
125    * <li>Determine the two valid numbers of regions each server should have,
126    *     <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
127    *
128    * <li>Iterate down the most loaded servers, shedding regions from each so
129    *     each server hosts exactly <b>MAX</b> regions.  Stop once you reach a
130    *     server that already has &lt;= <b>MAX</b> regions.
131    *     <p>
132    *     Order the regions to move from most recent to least.
133    *
134    * <li>Iterate down the least loaded servers, assigning regions so each server
135    *     has exactly </b>MIN</b> regions.  Stop once you reach a server that
136    *     already has &gt;= <b>MIN</b> regions.
137    *
138    *     Regions being assigned to underloaded servers are those that were shed
139    *     in the previous step.  It is possible that there were not enough
140    *     regions shed to fill each underloaded server to <b>MIN</b>.  If so we
141    *     end up with a number of regions required to do so, <b>neededRegions</b>.
142    *
143    *     It is also possible that we were able to fill each underloaded but ended
144    *     up with regions that were unassigned from overloaded servers but that
145    *     still do not have assignment.
146    *
147    *     If neither of these conditions hold (no regions needed to fill the
148    *     underloaded servers, no regions leftover from overloaded servers),
149    *     we are done and return.  Otherwise we handle these cases below.
150    *
151    * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers),
152    *     we iterate the most loaded servers again, shedding a single server from
153    *     each (this brings them from having <b>MAX</b> regions to having
154    *     <b>MIN</b> regions).
155    *
156    * <li>We now definitely have more regions that need assignment, either from
157    *     the previous step or from the original shedding from overloaded servers.
158    *     Iterate the least loaded servers filling each to <b>MIN</b>.
159    *
160    * <li>If we still have more regions that need assignment, again iterate the
161    *     least loaded servers, this time giving each one (filling them to
162    *     </b>MAX</b>) until we run out.
163    *
164    * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions.
165    *
166    *     In addition, any server hosting &gt;= <b>MAX</b> regions is guaranteed
167    *     to end up with <b>MAX</b> regions at the end of the balancing.  This
168    *     ensures the minimal number of regions possible are moved.
169    * </ol>
170    *
171    * TODO: We can at-most reassign the number of regions away from a particular
172    *       server to be how many they report as most loaded.
173    *       Should we just keep all assignment in memory?  Any objections?
174    *       Does this mean we need HeapSize on HMaster?  Or just careful monitor?
175    *       (current thinking is we will hold all assignments in memory)
176    *
177    * @param clusterMap Map of regionservers and their load/region information to
178    *                   a list of their most loaded regions
179    * @return a list of regions to be moved, including source and destination,
180    *         or null if cluster is already balanced
181    */
182   public List<RegionPlan> balanceCluster(
183       Map<ServerName, List<HRegionInfo>> clusterMap) {
184     boolean emptyRegionServerPresent = false;
185     long startTime = System.currentTimeMillis();
186 
187     ClusterLoadState cs = new ClusterLoadState(clusterMap);
188 
189     if (!this.needsBalance(cs)) return null;
190     
191     int numServers = cs.getNumServers();
192     NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
193     int numRegions = cs.getNumRegions();
194     int min = numRegions / numServers;
195     int max = numRegions % numServers == 0 ? min : min + 1;
196 
197     // Using to check balance result.
198     StringBuilder strBalanceParam = new StringBuilder();
199     strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
200         .append(", numServers=").append(numServers).append(", max=").append(max)
201         .append(", min=").append(min);
202     LOG.debug(strBalanceParam.toString());
203 
204     // Balance the cluster
205     // TODO: Look at data block locality or a more complex load to do this
206     MinMaxPriorityQueue<RegionPlan> regionsToMove =
207       MinMaxPriorityQueue.orderedBy(rpComparator).create();
208     List<RegionPlan> regionsToReturn = new ArrayList<RegionPlan>();
209 
210     // Walk down most loaded, pruning each to the max
211     int serversOverloaded = 0;
212     // flag used to fetch regions from head and tail of list, alternately
213     boolean fetchFromTail = false;
214     Map<ServerName, BalanceInfo> serverBalanceInfo =
215       new TreeMap<ServerName, BalanceInfo>();
216     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
217         serversByLoad.descendingMap().entrySet()) {
218       ServerAndLoad sal = server.getKey();
219       int regionCount = sal.getLoad();
220       if (regionCount <= max) {
221         serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
222         break;
223       }
224       serversOverloaded++;
225       List<HRegionInfo> regions = server.getValue();
226       int numToOffload = Math.min(regionCount - max, regions.size());
227       // account for the out-of-band regions which were assigned to this server
228       // after some other region server crashed 
229       Collections.sort(regions, riComparator);
230       int numTaken = 0;
231       for (int i = 0; i <= numToOffload; ) {
232         HRegionInfo hri = regions.get(i); // fetch from head
233         if (fetchFromTail) {
234           hri = regions.get(regions.size() - 1 - i);
235         }
236         i++;
237         // Don't rebalance meta regions.
238         if (hri.isMetaRegion()) continue;
239         regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
240         numTaken++;
241         if (numTaken >= numToOffload) break;
242         // fetch in alternate order if there is new region server
243         if (emptyRegionServerPresent) {
244           fetchFromTail = !fetchFromTail;
245         }
246       }
247       serverBalanceInfo.put(sal.getServerName(),
248         new BalanceInfo(numToOffload, (-1)*numTaken));
249     }
250     int totalNumMoved = regionsToMove.size();
251 
252     // Walk down least loaded, filling each to the min
253     int neededRegions = 0; // number of regions needed to bring all up to min
254     fetchFromTail = false;
255 
256     Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
257     float average = (float)numRegions / numServers; // for logging
258     int maxToTake = numRegions - (int)average;
259     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
260         serversByLoad.entrySet()) {
261       if (maxToTake == 0) break; // no more to take
262       int regionCount = server.getKey().getLoad();
263       if (regionCount >= min && regionCount > 0) {
264         continue; // look for other servers which haven't reached min
265       }
266       int regionsToPut = min - regionCount;
267       if (regionsToPut == 0)
268       {
269         regionsToPut = 1;
270       }
271       maxToTake -= regionsToPut;
272       underloadedServers.put(server.getKey().getServerName(), regionsToPut);
273     }
274     // number of servers that get new regions
275     int serversUnderloaded = underloadedServers.size();
276     int incr = 1;
277     List<ServerName> sns =
278       Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
279     Collections.shuffle(sns, RANDOM);
280     while (regionsToMove.size() > 0) {
281       int cnt = 0;
282       int i = incr > 0 ? 0 : underloadedServers.size()-1;
283       for (; i >= 0 && i < underloadedServers.size(); i += incr) {
284         if (regionsToMove.isEmpty()) break;
285         ServerName si = sns.get(i);
286         int numToTake = underloadedServers.get(si);
287         if (numToTake == 0) continue;
288 
289         addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
290         if (emptyRegionServerPresent) {
291           fetchFromTail = !fetchFromTail;
292         }
293 
294         underloadedServers.put(si, numToTake-1);
295         cnt++;
296         BalanceInfo bi = serverBalanceInfo.get(si);
297         if (bi == null) {
298           bi = new BalanceInfo(0, 0);
299           serverBalanceInfo.put(si, bi);
300         }
301         bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
302       }
303       if (cnt == 0) break;
304       // iterates underloadedServers in the other direction
305       incr = -incr;
306     }
307     for (Integer i : underloadedServers.values()) {
308       // If we still want to take some, increment needed
309       neededRegions += i;
310     }
311 
312     // If none needed to fill all to min and none left to drain all to max,
313     // we are done
314     if (neededRegions == 0 && regionsToMove.isEmpty()) {
315       long endTime = System.currentTimeMillis();
316       LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
317           "Moving " + totalNumMoved + " regions off of " +
318           serversOverloaded + " overloaded servers onto " +
319           serversUnderloaded + " less loaded servers");
320       return regionsToReturn;
321     }
322 
323     // Need to do a second pass.
324     // Either more regions to assign out or servers that are still underloaded
325 
326     // If we need more to fill min, grab one from each most loaded until enough
327     if (neededRegions != 0) {
328       // Walk down most loaded, grabbing one from each until we get enough
329       for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
330         serversByLoad.descendingMap().entrySet()) {
331         BalanceInfo balanceInfo =
332           serverBalanceInfo.get(server.getKey().getServerName());
333         int idx =
334           balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
335         if (idx >= server.getValue().size()) break;
336         HRegionInfo region = server.getValue().get(idx);
337         if (region.isMetaRegion()) continue; // Don't move meta regions.
338         regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
339         totalNumMoved++;
340         if (--neededRegions == 0) {
341           // No more regions needed, done shedding
342           break;
343         }
344       }
345     }
346 
347     // Now we have a set of regions that must be all assigned out
348     // Assign each underloaded up to the min, then if leftovers, assign to max
349 
350     // Walk down least loaded, assigning to each to fill up to min
351     for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
352         serversByLoad.entrySet()) {
353       int regionCount = server.getKey().getLoad();
354       if (regionCount >= min) break;
355       BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
356       if(balanceInfo != null) {
357         regionCount += balanceInfo.getNumRegionsAdded();
358       }
359       if(regionCount >= min) {
360         continue;
361       }
362       int numToTake = min - regionCount;
363       int numTaken = 0;
364       while(numTaken < numToTake && 0 < regionsToMove.size()) {
365         addRegionPlan(regionsToMove, fetchFromTail,
366           server.getKey().getServerName(), regionsToReturn);
367         numTaken++;
368         if (emptyRegionServerPresent) {
369           fetchFromTail = !fetchFromTail;
370         }
371       }
372     }
373 
374     // If we still have regions to dish out, assign underloaded to max
375     if (0 < regionsToMove.size()) {
376       for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
377         serversByLoad.entrySet()) {
378         int regionCount = server.getKey().getLoad();
379         BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
380         if(balanceInfo != null) {
381           regionCount += balanceInfo.getNumRegionsAdded();
382         }
383         if(regionCount >= max) {
384           break;
385         }
386         addRegionPlan(regionsToMove, fetchFromTail,
387           server.getKey().getServerName(), regionsToReturn);
388         if (emptyRegionServerPresent) {
389           fetchFromTail = !fetchFromTail;
390         }
391         if (regionsToMove.isEmpty()) {
392           break;
393         }
394       }
395     }
396 
397     long endTime = System.currentTimeMillis();
398 
399     if (!regionsToMove.isEmpty() || neededRegions != 0) {
400       // Emit data so can diagnose how balancer went astray.
401       LOG.warn("regionsToMove=" + totalNumMoved +
402         ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
403         ", serversUnderloaded=" + serversUnderloaded);
404       StringBuilder sb = new StringBuilder();
405       for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterMap.entrySet()) {
406         if (sb.length() > 0) sb.append(", ");
407         sb.append(e.getKey().toString());
408         sb.append(" ");
409         sb.append(e.getValue().size());
410       }
411       LOG.warn("Input " + sb.toString());
412     }
413 
414     // All done!
415     LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
416         "Moving " + totalNumMoved + " regions off of " +
417         serversOverloaded + " overloaded servers onto " +
418         serversUnderloaded + " less loaded servers");
419 
420     return regionsToReturn;
421   }
422 
423   /**
424    * Add a region from the head or tail to the List of regions to return.
425    */
426   private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
427       final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
428     RegionPlan rp = null;
429     if (!fetchFromTail) rp = regionsToMove.remove();
430     else rp = regionsToMove.removeLast();
431     rp.setDestination(sn);
432     regionsToReturn.add(rp);
433   }
434 }