1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.SortedMap;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import java.util.concurrent.CopyOnWriteArrayList;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.hbase.ClockOutOfSyncException;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.RegionLoad;
44 import org.apache.hadoop.hbase.Server;
45 import org.apache.hadoop.hbase.ServerLoad;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.YouAreDeadException;
48 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
49 import org.apache.hadoop.hbase.client.HConnection;
50 import org.apache.hadoop.hbase.client.HConnectionManager;
51 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
52
53 import org.apache.hadoop.hbase.classification.InterfaceAudience;
54 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
55 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
56
57 import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
58 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
59 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
60 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61 import org.apache.hadoop.hbase.protobuf.RequestConverter;
62 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
63 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
64 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
65 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
66 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
67 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
68 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
69 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
70 import org.apache.hadoop.hbase.util.Bytes;
71 import org.apache.hadoop.hbase.util.Triple;
72 import org.apache.hadoop.hbase.util.RetryCounter;
73 import org.apache.hadoop.hbase.util.RetryCounterFactory;
74
75 import com.google.common.annotations.VisibleForTesting;
76 import com.google.protobuf.ServiceException;
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 @InterfaceAudience.Private
101 public class ServerManager {
102 public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
103 "hbase.master.wait.on.regionservers.maxtostart";
104
105 public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
106 "hbase.master.wait.on.regionservers.mintostart";
107
108 public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
109 "hbase.master.wait.on.regionservers.timeout";
110
111 public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
112 "hbase.master.wait.on.regionservers.interval";
113
114 private static final Log LOG = LogFactory.getLog(ServerManager.class);
115
116
117 private volatile boolean clusterShutdown = false;
118
119 private final SortedMap<byte[], Long> flushedSequenceIdByRegion =
120 new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
121
122
123 private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
124 new ConcurrentHashMap<ServerName, ServerLoad>();
125
126
127
128
129
130 private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
131 new HashMap<ServerName, AdminService.BlockingInterface>();
132
133
134
135
136
137 private final ArrayList<ServerName> drainingServers =
138 new ArrayList<ServerName>();
139
140 private final Server master;
141 private final MasterServices services;
142 private final HConnection connection;
143
144 private final DeadServer deadservers = new DeadServer();
145
146 private final long maxSkew;
147 private final long warningSkew;
148
149 private final RetryCounterFactory pingRetryCounterFactory;
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 private Map<ServerName, Boolean> requeuedDeadServers
185 = new ConcurrentHashMap<ServerName, Boolean>();
186
187
188 private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
189
190
191
192
193
194
195
196 public ServerManager(final Server master, final MasterServices services)
197 throws IOException {
198 this(master, services, true);
199 }
200
201 @SuppressWarnings("deprecation")
202 ServerManager(final Server master, final MasterServices services,
203 final boolean connect) throws IOException {
204 this.master = master;
205 this.services = services;
206 Configuration c = master.getConfiguration();
207 maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
208 warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
209 this.connection = connect ? HConnectionManager.getConnection(c) : null;
210 int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
211 "hbase.master.maximum.ping.server.attempts", 10));
212 int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
213 "hbase.master.ping.server.retry.sleep.interval", 100));
214 this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
215 }
216
217
218
219
220
221 public void registerListener(final ServerListener listener) {
222 this.listeners.add(listener);
223 }
224
225
226
227
228
229 public boolean unregisterListener(final ServerListener listener) {
230 return this.listeners.remove(listener);
231 }
232
233
234
235
236
237
238
239
240
241
242 ServerName regionServerStartup(final InetAddress ia, final int port,
243 final long serverStartcode, long serverCurrentTime)
244 throws IOException {
245
246
247
248
249
250
251
252 ServerName sn = ServerName.valueOf(ia.getHostName(), port, serverStartcode);
253 checkClockSkew(sn, serverCurrentTime);
254 checkIsDead(sn, "STARTUP");
255 if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
256 LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
257 + " could not record the server: " + sn);
258 }
259 return sn;
260 }
261
262
263
264
265
266
267 private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
268 Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
269 for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
270 byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
271 Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
272 long l = entry.getValue().getCompleteSequenceId();
273 if (existingValue != null) {
274 if (l != -1 && l < existingValue) {
275 LOG.warn("RegionServer " + sn +
276 " indicates a last flushed sequence id (" + entry.getValue() +
277 ") that is less than the previous last flushed sequence id (" +
278 existingValue + ") for region " +
279 Bytes.toString(entry.getKey()) + " Ignoring.");
280
281 continue;
282 }
283 }
284 flushedSequenceIdByRegion.put(encodedRegionName, l);
285 }
286 }
287
288 void regionServerReport(ServerName sn,
289 ServerLoad sl) throws YouAreDeadException {
290 checkIsDead(sn, "REPORT");
291 if (null == this.onlineServers.replace(sn, sl)) {
292
293
294
295
296
297
298 if (!checkAndRecordNewServer(sn, sl)) {
299 LOG.info("RegionServerReport ignored, could not record the server: " + sn);
300 return;
301 }
302 }
303 updateLastFlushedSequenceIds(sn, sl);
304 }
305
306
307
308
309
310
311
312
313
314 boolean checkAndRecordNewServer(
315 final ServerName serverName, final ServerLoad sl) {
316 ServerName existingServer = null;
317 synchronized (this.onlineServers) {
318 existingServer = findServerWithSameHostnamePortWithLock(serverName);
319 if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
320 LOG.info("Server serverName=" + serverName + " rejected; we already have "
321 + existingServer.toString() + " registered with same hostname and port");
322 return false;
323 }
324 recordNewServerWithLock(serverName, sl);
325 }
326
327
328 if (!this.listeners.isEmpty()) {
329 for (ServerListener listener : this.listeners) {
330 listener.serverAdded(serverName);
331 }
332 }
333
334
335
336 if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
337 LOG.info("Triggering server recovery; existingServer " +
338 existingServer + " looks stale, new server:" + serverName);
339 expireServer(existingServer);
340 }
341 return true;
342 }
343
344
345
346
347
348
349
350
351
352 private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
353 throws ClockOutOfSyncException {
354 long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
355 if (skew > maxSkew) {
356 String message = "Server " + serverName + " has been " +
357 "rejected; Reported time is too far out of sync with master. " +
358 "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
359 LOG.warn(message);
360 throw new ClockOutOfSyncException(message);
361 } else if (skew > warningSkew){
362 String message = "Reported time for server " + serverName + " is out of sync with master " +
363 "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
364 "error threshold is " + maxSkew + "ms)";
365 LOG.warn(message);
366 }
367 }
368
369
370
371
372
373
374
375
376
377 private void checkIsDead(final ServerName serverName, final String what)
378 throws YouAreDeadException {
379 if (this.deadservers.isDeadServer(serverName)) {
380
381
382 String message = "Server " + what + " rejected; currently processing " +
383 serverName + " as dead server";
384 LOG.debug(message);
385 throw new YouAreDeadException(message);
386 }
387
388
389 if ((this.services == null || ((HMaster) this.services).isInitialized())
390 && this.deadservers.cleanPreviousInstance(serverName)) {
391
392
393 LOG.debug(what + ":" + " Server " + serverName + " came back up," +
394 " removed it from the dead servers list");
395 }
396 }
397
398
399
400
401
402 private ServerName findServerWithSameHostnamePortWithLock(
403 final ServerName serverName) {
404 for (ServerName sn: this.onlineServers.keySet()) {
405 if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
406 }
407 return null;
408 }
409
410
411
412
413
414
415
416 @VisibleForTesting
417 void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
418 LOG.info("Registering server=" + serverName);
419 this.onlineServers.put(serverName, sl);
420 this.rsAdmins.remove(serverName);
421 }
422
423 public long getLastFlushedSequenceId(byte[] encodedRegionName) {
424 long seqId = -1L;
425 if (flushedSequenceIdByRegion.containsKey(encodedRegionName)) {
426 seqId = flushedSequenceIdByRegion.get(encodedRegionName);
427 }
428 return seqId;
429 }
430
431
432
433
434
435 public ServerLoad getLoad(final ServerName serverName) {
436 return this.onlineServers.get(serverName);
437 }
438
439
440
441
442
443
444
445 public double getAverageLoad() {
446 int totalLoad = 0;
447 int numServers = 0;
448 double averageLoad;
449 for (ServerLoad sl: this.onlineServers.values()) {
450 numServers++;
451 totalLoad += sl.getNumberOfRegions();
452 }
453 averageLoad = (double)totalLoad / (double)numServers;
454 return averageLoad;
455 }
456
457
458 int countOfRegionServers() {
459
460 return this.onlineServers.size();
461 }
462
463
464
465
466 public Map<ServerName, ServerLoad> getOnlineServers() {
467
468 synchronized (this.onlineServers) {
469 return Collections.unmodifiableMap(this.onlineServers);
470 }
471 }
472
473
474 public DeadServer getDeadServers() {
475 return this.deadservers;
476 }
477
478
479
480
481
482 public boolean areDeadServersInProgress() {
483 return this.deadservers.areDeadServersInProgress();
484 }
485
486 void letRegionServersShutdown() {
487 long previousLogTime = 0;
488 int onlineServersCt;
489 while ((onlineServersCt = onlineServers.size()) > 0) {
490
491 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
492 StringBuilder sb = new StringBuilder();
493
494 for (ServerName key : this.onlineServers.keySet()) {
495 if (sb.length() > 0) {
496 sb.append(", ");
497 }
498 sb.append(key);
499 }
500 LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
501 previousLogTime = System.currentTimeMillis();
502 }
503
504 synchronized (onlineServers) {
505 try {
506 if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
507 } catch (InterruptedException ignored) {
508
509 }
510 }
511 }
512 }
513
514
515
516
517
518 public synchronized void expireServer(final ServerName serverName) {
519 if (!services.isServerShutdownHandlerEnabled()) {
520 LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
521 + "delay expiring server " + serverName);
522 this.queuedDeadServers.add(serverName);
523 return;
524 }
525 if (this.deadservers.isDeadServer(serverName)) {
526
527 LOG.warn("Expiration of " + serverName +
528 " but server shutdown already in progress");
529 return;
530 }
531 synchronized (onlineServers) {
532 if (!this.onlineServers.containsKey(serverName)) {
533 LOG.warn("Expiration of " + serverName + " but server not online");
534 }
535
536
537
538 this.deadservers.add(serverName);
539 this.onlineServers.remove(serverName);
540 onlineServers.notifyAll();
541 }
542 this.rsAdmins.remove(serverName);
543
544
545 if (this.clusterShutdown) {
546 LOG.info("Cluster shutdown set; " + serverName +
547 " expired; onlineServers=" + this.onlineServers.size());
548 if (this.onlineServers.isEmpty()) {
549 master.stop("Cluster shutdown set; onlineServer=0");
550 }
551 return;
552 }
553
554 boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);
555 if (carryingMeta) {
556 this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
557 this.services, this.deadservers, serverName));
558 } else {
559 this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
560 this.services, this.deadservers, serverName, true));
561 }
562 LOG.debug("Added=" + serverName +
563 " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
564
565
566 if (!this.listeners.isEmpty()) {
567 for (ServerListener listener : this.listeners) {
568 listener.serverRemoved(serverName);
569 }
570 }
571 }
572
573 public synchronized void processDeadServer(final ServerName serverName) {
574 this.processDeadServer(serverName, false);
575 }
576
577 public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitHlog) {
578
579
580
581
582
583
584
585
586 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
587 requeuedDeadServers.put(serverName, shouldSplitHlog);
588 return;
589 }
590
591 this.deadservers.add(serverName);
592 this.services.getExecutorService().submit(
593 new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
594 shouldSplitHlog));
595 }
596
597
598
599
600
601 synchronized void processQueuedDeadServers() {
602 if (!services.isServerShutdownHandlerEnabled()) {
603 LOG.info("Master hasn't enabled ServerShutdownHandler");
604 }
605 Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
606 while (serverIterator.hasNext()) {
607 ServerName tmpServerName = serverIterator.next();
608 expireServer(tmpServerName);
609 serverIterator.remove();
610 requeuedDeadServers.remove(tmpServerName);
611 }
612
613 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
614 LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
615 }
616
617 for(ServerName tmpServerName : requeuedDeadServers.keySet()){
618 processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
619 }
620 requeuedDeadServers.clear();
621 }
622
623
624
625
626 public boolean removeServerFromDrainList(final ServerName sn) {
627
628
629
630 if (!this.isServerOnline(sn)) {
631 LOG.warn("Server " + sn + " is not currently online. " +
632 "Removing from draining list anyway, as requested.");
633 }
634
635 return this.drainingServers.remove(sn);
636 }
637
638
639
640
641 public boolean addServerToDrainList(final ServerName sn) {
642
643
644
645 if (!this.isServerOnline(sn)) {
646 LOG.warn("Server " + sn + " is not currently online. " +
647 "Ignoring request to add it to draining list.");
648 return false;
649 }
650
651
652 if (this.drainingServers.contains(sn)) {
653 LOG.warn("Server " + sn + " is already in the draining server list." +
654 "Ignoring request to add it again.");
655 return false;
656 }
657 return this.drainingServers.add(sn);
658 }
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673 public RegionOpeningState sendRegionOpen(final ServerName server,
674 HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes)
675 throws IOException {
676 AdminService.BlockingInterface admin = getRsAdmin(server);
677 if (admin == null) {
678 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
679 " failed because no RPC connection found to this server");
680 return RegionOpeningState.FAILED_OPENING;
681 }
682 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
683 region, versionOfOfflineNode, favoredNodes,
684 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
685 try {
686 OpenRegionResponse response = admin.openRegion(null, request);
687 return ResponseConverter.getRegionOpeningState(response);
688 } catch (ServiceException se) {
689 throw ProtobufUtil.getRemoteException(se);
690 }
691 }
692
693
694
695
696
697
698
699
700
701
702 public List<RegionOpeningState> sendRegionOpen(ServerName server,
703 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos)
704 throws IOException {
705 AdminService.BlockingInterface admin = getRsAdmin(server);
706 if (admin == null) {
707 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
708 " failed because no RPC connection found to this server");
709 return null;
710 }
711
712 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
713 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
714 try {
715 OpenRegionResponse response = admin.openRegion(null, request);
716 return ResponseConverter.getRegionOpeningStateList(response);
717 } catch (ServiceException se) {
718 throw ProtobufUtil.getRemoteException(se);
719 }
720 }
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736 public boolean sendRegionClose(ServerName server, HRegionInfo region,
737 int versionOfClosingNode, ServerName dest, boolean transitionInZK) throws IOException {
738 if (server == null) throw new NullPointerException("Passed server is null");
739 AdminService.BlockingInterface admin = getRsAdmin(server);
740 if (admin == null) {
741 throw new IOException("Attempting to send CLOSE RPC to server " +
742 server.toString() + " for region " +
743 region.getRegionNameAsString() +
744 " failed because no RPC connection found to this server");
745 }
746 return ProtobufUtil.closeRegion(admin, server, region.getRegionName(),
747 versionOfClosingNode, dest, transitionInZK);
748 }
749
750 public boolean sendRegionClose(ServerName server,
751 HRegionInfo region, int versionOfClosingNode) throws IOException {
752 return sendRegionClose(server, region, versionOfClosingNode, null, true);
753 }
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768 public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
769 HRegionInfo region_b, boolean forcible) throws IOException {
770 if (server == null)
771 throw new NullPointerException("Passed server is null");
772 if (region_a == null || region_b == null)
773 throw new NullPointerException("Passed region is null");
774 AdminService.BlockingInterface admin = getRsAdmin(server);
775 if (admin == null) {
776 throw new IOException("Attempting to send MERGE REGIONS RPC to server "
777 + server.toString() + " for region "
778 + region_a.getRegionNameAsString() + ","
779 + region_b.getRegionNameAsString()
780 + " failed because no RPC connection found to this server");
781 }
782 ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible);
783 }
784
785
786
787
788 public boolean isServerReachable(ServerName server) {
789 if (server == null) throw new NullPointerException("Passed server is null");
790
791 RetryCounter retryCounter = pingRetryCounterFactory.create();
792 while (retryCounter.shouldRetry()) {
793 synchronized (this.onlineServers) {
794 if (this.deadservers.isDeadServer(server)) {
795 return false;
796 }
797 }
798 try {
799 AdminService.BlockingInterface admin = getRsAdmin(server);
800 if (admin != null) {
801 ServerInfo info = ProtobufUtil.getServerInfo(admin);
802 return info != null && info.hasServerName()
803 && server.getStartcode() == info.getServerName().getStartCode();
804 }
805 } catch (RegionServerStoppedException e) {
806 if (LOG.isDebugEnabled()) {
807 LOG.debug("Couldn't reach " + server, e);
808 }
809 break;
810 } catch (ServerNotRunningYetException e) {
811 if (LOG.isDebugEnabled()) {
812 LOG.debug("Couldn't reach " + server, e);
813 }
814 break;
815 } catch (IOException ioe) {
816 if (LOG.isDebugEnabled()) {
817 LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes() + " of "
818 + retryCounter.getMaxAttempts(), ioe);
819 }
820 try {
821 retryCounter.sleepUntilNextRetry();
822 } catch(InterruptedException ie) {
823 Thread.currentThread().interrupt();
824 break;
825 }
826 }
827 }
828 return false;
829 }
830
831
832
833
834
835
836
837 private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
838 throws IOException {
839 AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
840 if (admin == null) {
841 LOG.debug("New admin connection to " + sn.toString());
842 admin = this.connection.getAdmin(sn);
843 this.rsAdmins.put(sn, admin);
844 }
845 return admin;
846 }
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861 public void waitForRegionServers(MonitoredTask status)
862 throws InterruptedException {
863 final long interval = this.master.getConfiguration().
864 getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
865 final long timeout = this.master.getConfiguration().
866 getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
867 int minToStart = this.master.getConfiguration().
868 getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
869 if (minToStart < 1) {
870 LOG.warn(String.format(
871 "The value of '%s' (%d) can not be less than 1, ignoring.",
872 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
873 minToStart = 1;
874 }
875 int maxToStart = this.master.getConfiguration().
876 getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
877 if (maxToStart < minToStart) {
878 LOG.warn(String.format(
879 "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
880 WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
881 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
882 maxToStart = Integer.MAX_VALUE;
883 }
884
885 long now = System.currentTimeMillis();
886 final long startTime = now;
887 long slept = 0;
888 long lastLogTime = 0;
889 long lastCountChange = startTime;
890 int count = countOfRegionServers();
891 int oldCount = 0;
892 while (
893 !this.master.isStopped() &&
894 count < maxToStart &&
895 (lastCountChange+interval > now || timeout > slept || count < minToStart)
896 ){
897
898
899 if (oldCount != count || lastLogTime+interval < now){
900 lastLogTime = now;
901 String msg =
902 "Waiting for region servers count to settle; currently"+
903 " checked in " + count + ", slept for " + slept + " ms," +
904 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
905 ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
906 LOG.info(msg);
907 status.setStatus(msg);
908 }
909
910
911 final long sleepTime = 50;
912 Thread.sleep(sleepTime);
913 now = System.currentTimeMillis();
914 slept = now - startTime;
915
916 oldCount = count;
917 count = countOfRegionServers();
918 if (count != oldCount) {
919 lastCountChange = now;
920 }
921 }
922
923 LOG.info("Finished waiting for region servers count to settle;" +
924 " checked in " + count + ", slept for " + slept + " ms," +
925 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
926 " master is "+ (this.master.isStopped() ? "stopped.": "running.")
927 );
928 }
929
930
931
932
933 public List<ServerName> getOnlineServersList() {
934
935
936 return new ArrayList<ServerName>(this.onlineServers.keySet());
937 }
938
939
940
941
942 public List<ServerName> getDrainingServersList() {
943 return new ArrayList<ServerName>(this.drainingServers);
944 }
945
946
947
948
949 Set<ServerName> getDeadNotExpiredServers() {
950 return new HashSet<ServerName>(this.queuedDeadServers);
951 }
952
953
954
955
956
957
958 void removeRequeuedDeadServers() {
959 requeuedDeadServers.clear();
960 }
961
962
963
964
965
966 Map<ServerName, Boolean> getRequeuedDeadServers() {
967 return Collections.unmodifiableMap(this.requeuedDeadServers);
968 }
969
970 public boolean isServerOnline(ServerName serverName) {
971 return serverName != null && onlineServers.containsKey(serverName);
972 }
973
974
975
976
977
978
979
980 public synchronized boolean isServerDead(ServerName serverName) {
981 return serverName == null || deadservers.isDeadServer(serverName)
982 || queuedDeadServers.contains(serverName)
983 || requeuedDeadServers.containsKey(serverName);
984 }
985
986 public void shutdownCluster() {
987 this.clusterShutdown = true;
988 this.master.stop("Cluster shutdown requested");
989 }
990
991 public boolean isClusterShutdown() {
992 return this.clusterShutdown;
993 }
994
995
996
997
998 public void stop() {
999 if (connection != null) {
1000 try {
1001 connection.close();
1002 } catch (IOException e) {
1003 LOG.error("Attempt to close connection to master failed", e);
1004 }
1005 }
1006 }
1007
1008
1009
1010
1011
1012
1013 public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
1014 final List<ServerName> destServers = getOnlineServersList();
1015
1016 if (serverToExclude != null){
1017 destServers.remove(serverToExclude);
1018 }
1019
1020
1021 final List<ServerName> drainingServersCopy = getDrainingServersList();
1022 if (!drainingServersCopy.isEmpty()) {
1023 for (final ServerName server: drainingServersCopy) {
1024 destServers.remove(server);
1025 }
1026 }
1027
1028
1029 removeDeadNotExpiredServers(destServers);
1030
1031 return destServers;
1032 }
1033
1034
1035
1036
1037 public List<ServerName> createDestinationServersList(){
1038 return createDestinationServersList(null);
1039 }
1040
1041
1042
1043
1044
1045
1046
1047 void removeDeadNotExpiredServers(List<ServerName> servers) {
1048 Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1049 if (!deadNotExpiredServersCopy.isEmpty()) {
1050 for (ServerName server : deadNotExpiredServersCopy) {
1051 LOG.debug("Removing dead but not expired server: " + server
1052 + " from eligible server pool.");
1053 servers.remove(server);
1054 }
1055 }
1056 }
1057
1058
1059
1060
1061 void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1062 for (ServerName serverName : getOnlineServersList()) {
1063 deadservers.cleanAllPreviousInstances(serverName);
1064 }
1065 }
1066 }