1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.balancer;
19
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableMap;
27 import java.util.Random;
28 import java.util.TreeMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.ServerName;
36 import org.apache.hadoop.hbase.master.AssignmentManager;
37 import org.apache.hadoop.hbase.master.RegionPlan;
38
39 import com.google.common.collect.MinMaxPriorityQueue;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
58 public class SimpleLoadBalancer extends BaseLoadBalancer {
59 private static final Log LOG = LogFactory.getLog(SimpleLoadBalancer.class);
60 private static final Random RANDOM = new Random(System.currentTimeMillis());
61
62 private RegionInfoComparator riComparator = new RegionInfoComparator();
63 private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
64
65
66
67
68
69
70
71
72
73
74 static class BalanceInfo {
75
76 private final int nextRegionForUnload;
77 private int numRegionsAdded;
78
79 public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
80 this.nextRegionForUnload = nextRegionForUnload;
81 this.numRegionsAdded = numRegionsAdded;
82 }
83
84 int getNextRegionForUnload() {
85 return nextRegionForUnload;
86 }
87
88 int getNumRegionsAdded() {
89 return numRegionsAdded;
90 }
91
92 void setNumRegionsAdded(int numAdded) {
93 this.numRegionsAdded = numAdded;
94 }
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 public List<RegionPlan> balanceCluster(
183 Map<ServerName, List<HRegionInfo>> clusterMap) {
184 boolean emptyRegionServerPresent = false;
185 long startTime = System.currentTimeMillis();
186
187 ClusterLoadState cs = new ClusterLoadState(clusterMap);
188
189 if (!this.needsBalance(cs)) return null;
190
191 int numServers = cs.getNumServers();
192 NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
193 int numRegions = cs.getNumRegions();
194 int min = numRegions / numServers;
195 int max = numRegions % numServers == 0 ? min : min + 1;
196
197
198 StringBuilder strBalanceParam = new StringBuilder();
199 strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
200 .append(", numServers=").append(numServers).append(", max=").append(max)
201 .append(", min=").append(min);
202 LOG.debug(strBalanceParam.toString());
203
204
205
206 MinMaxPriorityQueue<RegionPlan> regionsToMove =
207 MinMaxPriorityQueue.orderedBy(rpComparator).create();
208 List<RegionPlan> regionsToReturn = new ArrayList<RegionPlan>();
209
210
211 int serversOverloaded = 0;
212
213 boolean fetchFromTail = false;
214 Map<ServerName, BalanceInfo> serverBalanceInfo =
215 new TreeMap<ServerName, BalanceInfo>();
216 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
217 serversByLoad.descendingMap().entrySet()) {
218 ServerAndLoad sal = server.getKey();
219 int regionCount = sal.getLoad();
220 if (regionCount <= max) {
221 serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
222 break;
223 }
224 serversOverloaded++;
225 List<HRegionInfo> regions = server.getValue();
226 int numToOffload = Math.min(regionCount - max, regions.size());
227
228
229 Collections.sort(regions, riComparator);
230 int numTaken = 0;
231 for (int i = 0; i <= numToOffload; ) {
232 HRegionInfo hri = regions.get(i);
233 if (fetchFromTail) {
234 hri = regions.get(regions.size() - 1 - i);
235 }
236 i++;
237
238 if (hri.isMetaRegion()) continue;
239 regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
240 numTaken++;
241 if (numTaken >= numToOffload) break;
242
243 if (emptyRegionServerPresent) {
244 fetchFromTail = !fetchFromTail;
245 }
246 }
247 serverBalanceInfo.put(sal.getServerName(),
248 new BalanceInfo(numToOffload, (-1)*numTaken));
249 }
250 int totalNumMoved = regionsToMove.size();
251
252
253 int neededRegions = 0;
254 fetchFromTail = false;
255
256 Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
257 float average = (float)numRegions / numServers;
258 int maxToTake = numRegions - (int)average;
259 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
260 serversByLoad.entrySet()) {
261 if (maxToTake == 0) break;
262 int regionCount = server.getKey().getLoad();
263 if (regionCount >= min && regionCount > 0) {
264 continue;
265 }
266 int regionsToPut = min - regionCount;
267 if (regionsToPut == 0)
268 {
269 regionsToPut = 1;
270 }
271 maxToTake -= regionsToPut;
272 underloadedServers.put(server.getKey().getServerName(), regionsToPut);
273 }
274
275 int serversUnderloaded = underloadedServers.size();
276 int incr = 1;
277 List<ServerName> sns =
278 Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
279 Collections.shuffle(sns, RANDOM);
280 while (regionsToMove.size() > 0) {
281 int cnt = 0;
282 int i = incr > 0 ? 0 : underloadedServers.size()-1;
283 for (; i >= 0 && i < underloadedServers.size(); i += incr) {
284 if (regionsToMove.isEmpty()) break;
285 ServerName si = sns.get(i);
286 int numToTake = underloadedServers.get(si);
287 if (numToTake == 0) continue;
288
289 addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
290 if (emptyRegionServerPresent) {
291 fetchFromTail = !fetchFromTail;
292 }
293
294 underloadedServers.put(si, numToTake-1);
295 cnt++;
296 BalanceInfo bi = serverBalanceInfo.get(si);
297 if (bi == null) {
298 bi = new BalanceInfo(0, 0);
299 serverBalanceInfo.put(si, bi);
300 }
301 bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
302 }
303 if (cnt == 0) break;
304
305 incr = -incr;
306 }
307 for (Integer i : underloadedServers.values()) {
308
309 neededRegions += i;
310 }
311
312
313
314 if (neededRegions == 0 && regionsToMove.isEmpty()) {
315 long endTime = System.currentTimeMillis();
316 LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
317 "Moving " + totalNumMoved + " regions off of " +
318 serversOverloaded + " overloaded servers onto " +
319 serversUnderloaded + " less loaded servers");
320 return regionsToReturn;
321 }
322
323
324
325
326
327 if (neededRegions != 0) {
328
329 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
330 serversByLoad.descendingMap().entrySet()) {
331 BalanceInfo balanceInfo =
332 serverBalanceInfo.get(server.getKey().getServerName());
333 int idx =
334 balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
335 if (idx >= server.getValue().size()) break;
336 HRegionInfo region = server.getValue().get(idx);
337 if (region.isMetaRegion()) continue;
338 regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
339 totalNumMoved++;
340 if (--neededRegions == 0) {
341
342 break;
343 }
344 }
345 }
346
347
348
349
350
351 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
352 serversByLoad.entrySet()) {
353 int regionCount = server.getKey().getLoad();
354 if (regionCount >= min) break;
355 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
356 if(balanceInfo != null) {
357 regionCount += balanceInfo.getNumRegionsAdded();
358 }
359 if(regionCount >= min) {
360 continue;
361 }
362 int numToTake = min - regionCount;
363 int numTaken = 0;
364 while(numTaken < numToTake && 0 < regionsToMove.size()) {
365 addRegionPlan(regionsToMove, fetchFromTail,
366 server.getKey().getServerName(), regionsToReturn);
367 numTaken++;
368 if (emptyRegionServerPresent) {
369 fetchFromTail = !fetchFromTail;
370 }
371 }
372 }
373
374
375 if (0 < regionsToMove.size()) {
376 for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
377 serversByLoad.entrySet()) {
378 int regionCount = server.getKey().getLoad();
379 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
380 if(balanceInfo != null) {
381 regionCount += balanceInfo.getNumRegionsAdded();
382 }
383 if(regionCount >= max) {
384 break;
385 }
386 addRegionPlan(regionsToMove, fetchFromTail,
387 server.getKey().getServerName(), regionsToReturn);
388 if (emptyRegionServerPresent) {
389 fetchFromTail = !fetchFromTail;
390 }
391 if (regionsToMove.isEmpty()) {
392 break;
393 }
394 }
395 }
396
397 long endTime = System.currentTimeMillis();
398
399 if (!regionsToMove.isEmpty() || neededRegions != 0) {
400
401 LOG.warn("regionsToMove=" + totalNumMoved +
402 ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
403 ", serversUnderloaded=" + serversUnderloaded);
404 StringBuilder sb = new StringBuilder();
405 for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterMap.entrySet()) {
406 if (sb.length() > 0) sb.append(", ");
407 sb.append(e.getKey().toString());
408 sb.append(" ");
409 sb.append(e.getValue().size());
410 }
411 LOG.warn("Input " + sb.toString());
412 }
413
414
415 LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
416 "Moving " + totalNumMoved + " regions off of " +
417 serversOverloaded + " overloaded servers onto " +
418 serversUnderloaded + " less loaded servers");
419
420 return regionsToReturn;
421 }
422
423
424
425
426 private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
427 final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
428 RegionPlan rp = null;
429 if (!fetchFromTail) rp = regionsToMove.remove();
430 else rp = regionsToMove.removeLast();
431 rp.setDestination(sn);
432 regionsToReturn.add(rp);
433 }
434 }