View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.Set;
32  import java.util.SortedMap;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentSkipListMap;
35  import java.util.concurrent.CopyOnWriteArrayList;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.ClockOutOfSyncException;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.RegionLoad;
44  import org.apache.hadoop.hbase.Server;
45  import org.apache.hadoop.hbase.ServerLoad;
46  import org.apache.hadoop.hbase.ServerName;
47  import org.apache.hadoop.hbase.YouAreDeadException;
48  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
49  import org.apache.hadoop.hbase.client.HConnection;
50  import org.apache.hadoop.hbase.client.HConnectionManager;
51  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
52  
53  import org.apache.hadoop.hbase.classification.InterfaceAudience;
54  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
55  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
56  
57  import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
58  import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
59  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
60  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61  import org.apache.hadoop.hbase.protobuf.RequestConverter;
62  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
63  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
64  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
65  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
66  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
67  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
68  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
69  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
70  import org.apache.hadoop.hbase.util.Bytes;
71  import org.apache.hadoop.hbase.util.Triple;
72  import org.apache.hadoop.hbase.util.RetryCounter;
73  import org.apache.hadoop.hbase.util.RetryCounterFactory;
74  
75  import com.google.common.annotations.VisibleForTesting;
76  import com.google.protobuf.ServiceException;
77  
78  /**
79   * The ServerManager class manages info about region servers.
80   * <p>
81   * Maintains lists of online and dead servers.  Processes the startups,
82   * shutdowns, and deaths of region servers.
83   * <p>
84   * Servers are distinguished in two different ways.  A given server has a
85   * location, specified by hostname and port, and of which there can only be one
86   * online at any given time.  A server instance is specified by the location
87   * (hostname and port) as well as the startcode (timestamp from when the server
88   * was started).  This is used to differentiate a restarted instance of a given
89   * server from the original instance.
90   * <p>
91   * If a sever is known not to be running any more, it is called dead. The dead
92   * server needs to be handled by a ServerShutdownHandler.  If the handler is not
93   * enabled yet, the server can't be handled right away so it is queued up.
94   * After the handler is enabled, the server will be submitted to a handler to handle.
95   * However, the handler may be just partially enabled.  If so,
96   * the server cannot be fully processed, and be queued up for further processing.
97   * A server is fully processed only after the handler is fully enabled
98   * and has completed the handling.
99   */
100 @InterfaceAudience.Private
101 public class ServerManager {
102   public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
103       "hbase.master.wait.on.regionservers.maxtostart";
104 
105   public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
106       "hbase.master.wait.on.regionservers.mintostart";
107 
108   public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
109       "hbase.master.wait.on.regionservers.timeout";
110 
111   public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
112       "hbase.master.wait.on.regionservers.interval";
113 
114   private static final Log LOG = LogFactory.getLog(ServerManager.class);
115 
116   // Set if we are to shutdown the cluster.
117   private volatile boolean clusterShutdown = false;
118 
119   private final SortedMap<byte[], Long> flushedSequenceIdByRegion =
120     new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
121 
122   /** Map of registered servers to their current load */
123   private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
124     new ConcurrentHashMap<ServerName, ServerLoad>();
125 
126   /**
127    * Map of admin interfaces per registered regionserver; these interfaces we use to control
128    * regionservers out on the cluster
129    */
130   private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
131     new HashMap<ServerName, AdminService.BlockingInterface>();
132 
133   /**
134    * List of region servers <ServerName> that should not get any more new
135    * regions.
136    */
137   private final ArrayList<ServerName> drainingServers =
138     new ArrayList<ServerName>();
139 
140   private final Server master;
141   private final MasterServices services;
142   private final HConnection connection;
143 
144   private final DeadServer deadservers = new DeadServer();
145 
146   private final long maxSkew;
147   private final long warningSkew;
148 
149   private final RetryCounterFactory pingRetryCounterFactory;
150 
151   /**
152    * Set of region servers which are dead but not processed immediately. If one
153    * server died before master enables ServerShutdownHandler, the server will be
154    * added to this set and will be processed through calling
155    * {@link ServerManager#processQueuedDeadServers()} by master.
156    * <p>
157    * A dead server is a server instance known to be dead, not listed in the /hbase/rs
158    * znode any more. It may have not been submitted to ServerShutdownHandler yet
159    * because the handler is not enabled.
160    * <p>
161    * A dead server, which has been submitted to ServerShutdownHandler while the
162    * handler is not enabled, is queued up.
163    * <p>
164    * So this is a set of region servers known to be dead but not submitted to
165    * ServerShutdownHander for processing yet.
166    */
167   private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
168 
169   /**
170    * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not
171    * fully processed immediately.
172    * <p>
173    * If one server died before assignment manager finished the failover cleanup, the server will be
174    * added to this set and will be processed through calling
175    * {@link ServerManager#processQueuedDeadServers()} by assignment manager.
176    * <p>
177    * The Boolean value indicates whether log split is needed inside ServerShutdownHandler
178    * <p>
179    * ServerShutdownHandler processes a dead server submitted to the handler after the handler is
180    * enabled. It may not be able to complete the processing because meta is not yet online or master
181    * is currently in startup mode. In this case, the dead server will be parked in this set
182    * temporarily.
183    */
184   private Map<ServerName, Boolean> requeuedDeadServers
185     = new ConcurrentHashMap<ServerName, Boolean>();
186 
187   /** Listeners that are called on server events. */
188   private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
189 
190   /**
191    * Constructor.
192    * @param master
193    * @param services
194    * @throws ZooKeeperConnectionException
195    */
196   public ServerManager(final Server master, final MasterServices services)
197       throws IOException {
198     this(master, services, true);
199   }
200 
201   @SuppressWarnings("deprecation")
202   ServerManager(final Server master, final MasterServices services,
203       final boolean connect) throws IOException {
204     this.master = master;
205     this.services = services;
206     Configuration c = master.getConfiguration();
207     maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
208     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
209     this.connection = connect ? HConnectionManager.getConnection(c) : null;
210     int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
211       "hbase.master.maximum.ping.server.attempts", 10));
212     int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
213       "hbase.master.ping.server.retry.sleep.interval", 100));
214     this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
215   }
216 
217   /**
218    * Add the listener to the notification list.
219    * @param listener The ServerListener to register
220    */
221   public void registerListener(final ServerListener listener) {
222     this.listeners.add(listener);
223   }
224 
225   /**
226    * Remove the listener from the notification list.
227    * @param listener The ServerListener to unregister
228    */
229   public boolean unregisterListener(final ServerListener listener) {
230     return this.listeners.remove(listener);
231   }
232 
233   /**
234    * Let the server manager know a new regionserver has come online
235    * @param ia The remote address
236    * @param port The remote port
237    * @param serverStartcode
238    * @param serverCurrentTime The current time of the region server in ms
239    * @return The ServerName we know this server as.
240    * @throws IOException
241    */
242   ServerName regionServerStartup(final InetAddress ia, final int port,
243     final long serverStartcode, long serverCurrentTime)
244   throws IOException {
245     // Test for case where we get a region startup message from a regionserver
246     // that has been quickly restarted but whose znode expiration handler has
247     // not yet run, or from a server whose fail we are currently processing.
248     // Test its host+port combo is present in serverAddresstoServerInfo.  If it
249     // is, reject the server and trigger its expiration. The next time it comes
250     // in, it should have been removed from serverAddressToServerInfo and queued
251     // for processing by ProcessServerShutdown.
252     ServerName sn = ServerName.valueOf(ia.getHostName(), port, serverStartcode);
253     checkClockSkew(sn, serverCurrentTime);
254     checkIsDead(sn, "STARTUP");
255     if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
256       LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
257         + " could not record the server: " + sn);
258     }
259     return sn;
260   }
261 
262   /**
263    * Updates last flushed sequence Ids for the regions on server sn
264    * @param sn
265    * @param hsl
266    */
267   private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
268     Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
269     for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
270       byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
271       Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
272       long l = entry.getValue().getCompleteSequenceId();
273       if (existingValue != null) {
274         if (l != -1 && l < existingValue) {
275           LOG.warn("RegionServer " + sn +
276               " indicates a last flushed sequence id (" + entry.getValue() +
277               ") that is less than the previous last flushed sequence id (" +
278               existingValue + ") for region " +
279               Bytes.toString(entry.getKey()) + " Ignoring.");
280 
281           continue; // Don't let smaller sequence ids override greater sequence ids.
282         }
283       }
284       flushedSequenceIdByRegion.put(encodedRegionName, l);
285     }
286   }
287 
288   void regionServerReport(ServerName sn,
289       ServerLoad sl) throws YouAreDeadException {
290     checkIsDead(sn, "REPORT");
291     if (null == this.onlineServers.replace(sn, sl)) {
292       // Already have this host+port combo and its just different start code?
293       // Just let the server in. Presume master joining a running cluster.
294       // recordNewServer is what happens at the end of reportServerStartup.
295       // The only thing we are skipping is passing back to the regionserver
296       // the ServerName to use. Here we presume a master has already done
297       // that so we'll press on with whatever it gave us for ServerName.
298       if (!checkAndRecordNewServer(sn, sl)) {
299         LOG.info("RegionServerReport ignored, could not record the server: " + sn);
300         return; // Not recorded, so no need to move on
301       }
302     }
303     updateLastFlushedSequenceIds(sn, sl);
304   }
305 
306   /**
307    * Check is a server of same host and port already exists,
308    * if not, or the existed one got a smaller start code, record it.
309    *
310    * @param sn the server to check and record
311    * @param sl the server load on the server
312    * @return true if the server is recorded, otherwise, false
313    */
314   boolean checkAndRecordNewServer(
315       final ServerName serverName, final ServerLoad sl) {
316     ServerName existingServer = null;
317     synchronized (this.onlineServers) {
318       existingServer = findServerWithSameHostnamePortWithLock(serverName);
319       if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
320         LOG.info("Server serverName=" + serverName + " rejected; we already have "
321             + existingServer.toString() + " registered with same hostname and port");
322         return false;
323       }
324       recordNewServerWithLock(serverName, sl);
325     }
326 
327     // Tell our listeners that a server was added
328     if (!this.listeners.isEmpty()) {
329       for (ServerListener listener : this.listeners) {
330         listener.serverAdded(serverName);
331       }
332     }
333 
334     // Note that we assume that same ts means same server, and don't expire in that case.
335     //  TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
336     if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
337       LOG.info("Triggering server recovery; existingServer " +
338           existingServer + " looks stale, new server:" + serverName);
339       expireServer(existingServer);
340     }
341     return true;
342   }
343 
344   /**
345    * Checks if the clock skew between the server and the master. If the clock skew exceeds the
346    * configured max, it will throw an exception; if it exceeds the configured warning threshold,
347    * it will log a warning but start normally.
348    * @param serverName Incoming servers's name
349    * @param serverCurrentTime
350    * @throws ClockOutOfSyncException if the skew exceeds the configured max value
351    */
352   private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
353   throws ClockOutOfSyncException {
354     long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
355     if (skew > maxSkew) {
356       String message = "Server " + serverName + " has been " +
357         "rejected; Reported time is too far out of sync with master.  " +
358         "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
359       LOG.warn(message);
360       throw new ClockOutOfSyncException(message);
361     } else if (skew > warningSkew){
362       String message = "Reported time for server " + serverName + " is out of sync with master " +
363         "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
364         "error threshold is " + maxSkew + "ms)";
365       LOG.warn(message);
366     }
367   }
368 
369   /**
370    * If this server is on the dead list, reject it with a YouAreDeadException.
371    * If it was dead but came back with a new start code, remove the old entry
372    * from the dead list.
373    * @param serverName
374    * @param what START or REPORT
375    * @throws org.apache.hadoop.hbase.YouAreDeadException
376    */
377   private void checkIsDead(final ServerName serverName, final String what)
378       throws YouAreDeadException {
379     if (this.deadservers.isDeadServer(serverName)) {
380       // host name, port and start code all match with existing one of the
381       // dead servers. So, this server must be dead.
382       String message = "Server " + what + " rejected; currently processing " +
383           serverName + " as dead server";
384       LOG.debug(message);
385       throw new YouAreDeadException(message);
386     }
387     // remove dead server with same hostname and port of newly checking in rs after master
388     // initialization.See HBASE-5916 for more information.
389     if ((this.services == null || ((HMaster) this.services).isInitialized())
390         && this.deadservers.cleanPreviousInstance(serverName)) {
391       // This server has now become alive after we marked it as dead.
392       // We removed it's previous entry from the dead list to reflect it.
393       LOG.debug(what + ":" + " Server " + serverName + " came back up," +
394           " removed it from the dead servers list");
395     }
396   }
397 
398   /**
399    * Assumes onlineServers is locked.
400    * @return ServerName with matching hostname and port.
401    */
402   private ServerName findServerWithSameHostnamePortWithLock(
403       final ServerName serverName) {
404     for (ServerName sn: this.onlineServers.keySet()) {
405       if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
406     }
407     return null;
408   }
409 
410   /**
411    * Adds the onlineServers list. onlineServers should be locked.
412    * @param serverName The remote servers name.
413    * @param sl
414    * @return Server load from the removed server, if any.
415    */
416   @VisibleForTesting
417   void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
418     LOG.info("Registering server=" + serverName);
419     this.onlineServers.put(serverName, sl);
420     this.rsAdmins.remove(serverName);
421   }
422 
423   public long getLastFlushedSequenceId(byte[] encodedRegionName) {
424     long seqId = -1L;
425     if (flushedSequenceIdByRegion.containsKey(encodedRegionName)) {
426       seqId = flushedSequenceIdByRegion.get(encodedRegionName);
427     }
428     return seqId;
429   }
430 
431   /**
432    * @param serverName
433    * @return ServerLoad if serverName is known else null
434    */
435   public ServerLoad getLoad(final ServerName serverName) {
436     return this.onlineServers.get(serverName);
437   }
438 
439   /**
440    * Compute the average load across all region servers.
441    * Currently, this uses a very naive computation - just uses the number of
442    * regions being served, ignoring stats about number of requests.
443    * @return the average load
444    */
445   public double getAverageLoad() {
446     int totalLoad = 0;
447     int numServers = 0;
448     double averageLoad;
449     for (ServerLoad sl: this.onlineServers.values()) {
450         numServers++;
451         totalLoad += sl.getNumberOfRegions();
452     }
453     averageLoad = (double)totalLoad / (double)numServers;
454     return averageLoad;
455   }
456 
457   /** @return the count of active regionservers */
458   int countOfRegionServers() {
459     // Presumes onlineServers is a concurrent map
460     return this.onlineServers.size();
461   }
462 
463   /**
464    * @return Read-only map of servers to serverinfo
465    */
466   public Map<ServerName, ServerLoad> getOnlineServers() {
467     // Presumption is that iterating the returned Map is OK.
468     synchronized (this.onlineServers) {
469       return Collections.unmodifiableMap(this.onlineServers);
470     }
471   }
472 
473 
474   public DeadServer getDeadServers() {
475     return this.deadservers;
476   }
477 
478   /**
479    * Checks if any dead servers are currently in progress.
480    * @return true if any RS are being processed as dead, false if not
481    */
482   public boolean areDeadServersInProgress() {
483     return this.deadservers.areDeadServersInProgress();
484   }
485 
486   void letRegionServersShutdown() {
487     long previousLogTime = 0;
488     int onlineServersCt;
489     while ((onlineServersCt = onlineServers.size()) > 0) {
490 
491       if (System.currentTimeMillis() > (previousLogTime + 1000)) {
492         StringBuilder sb = new StringBuilder();
493         // It's ok here to not sync on onlineServers - merely logging
494         for (ServerName key : this.onlineServers.keySet()) {
495           if (sb.length() > 0) {
496             sb.append(", ");
497           }
498           sb.append(key);
499         }
500         LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
501         previousLogTime = System.currentTimeMillis();
502       }
503 
504       synchronized (onlineServers) {
505         try {
506           if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
507         } catch (InterruptedException ignored) {
508           // continue
509         }
510       }
511     }
512   }
513 
514   /*
515    * Expire the passed server.  Add it to list of dead servers and queue a
516    * shutdown processing.
517    */
518   public synchronized void expireServer(final ServerName serverName) {
519     if (!services.isServerShutdownHandlerEnabled()) {
520       LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
521           + "delay expiring server " + serverName);
522       this.queuedDeadServers.add(serverName);
523       return;
524     }
525     if (this.deadservers.isDeadServer(serverName)) {
526       // TODO: Can this happen?  It shouldn't be online in this case?
527       LOG.warn("Expiration of " + serverName +
528           " but server shutdown already in progress");
529       return;
530     }
531     synchronized (onlineServers) {
532       if (!this.onlineServers.containsKey(serverName)) {
533         LOG.warn("Expiration of " + serverName + " but server not online");
534       }
535       // Remove the server from the known servers lists and update load info BUT
536       // add to deadservers first; do this so it'll show in dead servers list if
537       // not in online servers list.
538       this.deadservers.add(serverName);
539       this.onlineServers.remove(serverName);
540       onlineServers.notifyAll();
541     }
542     this.rsAdmins.remove(serverName);
543     // If cluster is going down, yes, servers are going to be expiring; don't
544     // process as a dead server
545     if (this.clusterShutdown) {
546       LOG.info("Cluster shutdown set; " + serverName +
547         " expired; onlineServers=" + this.onlineServers.size());
548       if (this.onlineServers.isEmpty()) {
549         master.stop("Cluster shutdown set; onlineServer=0");
550       }
551       return;
552     }
553 
554     boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);
555     if (carryingMeta) {
556       this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
557         this.services, this.deadservers, serverName));
558     } else {
559       this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
560         this.services, this.deadservers, serverName, true));
561     }
562     LOG.debug("Added=" + serverName +
563       " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
564 
565     // Tell our listeners that a server was removed
566     if (!this.listeners.isEmpty()) {
567       for (ServerListener listener : this.listeners) {
568         listener.serverRemoved(serverName);
569       }
570     }
571   }
572 
573   public synchronized void processDeadServer(final ServerName serverName) {
574     this.processDeadServer(serverName, false);
575   }
576 
577   public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitHlog) {
578     // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
579     // in-memory region states, region servers could be down. Meta table can and
580     // should be re-assigned, log splitting can be done too. However, it is better to
581     // wait till the cleanup is done before re-assigning user regions.
582     //
583     // We should not wait in the server shutdown handler thread since it can clog
584     // the handler threads and meta table could not be re-assigned in case
585     // the corresponding server is down. So we queue them up here instead.
586     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
587       requeuedDeadServers.put(serverName, shouldSplitHlog);
588       return;
589     }
590 
591     this.deadservers.add(serverName);
592     this.services.getExecutorService().submit(
593       new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
594           shouldSplitHlog));
595   }
596 
597   /**
598    * Process the servers which died during master's initialization. It will be
599    * called after HMaster#assignMeta and AssignmentManager#joinCluster.
600    * */
601   synchronized void processQueuedDeadServers() {
602     if (!services.isServerShutdownHandlerEnabled()) {
603       LOG.info("Master hasn't enabled ServerShutdownHandler");
604     }
605     Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
606     while (serverIterator.hasNext()) {
607       ServerName tmpServerName = serverIterator.next();
608       expireServer(tmpServerName);
609       serverIterator.remove();
610       requeuedDeadServers.remove(tmpServerName);
611     }
612 
613     if (!services.getAssignmentManager().isFailoverCleanupDone()) {
614       LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
615     }
616 
617     for(ServerName tmpServerName : requeuedDeadServers.keySet()){
618       processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
619     }
620     requeuedDeadServers.clear();
621   }
622 
623   /*
624    * Remove the server from the drain list.
625    */
626   public boolean removeServerFromDrainList(final ServerName sn) {
627     // Warn if the server (sn) is not online.  ServerName is of the form:
628     // <hostname> , <port> , <startcode>
629 
630     if (!this.isServerOnline(sn)) {
631       LOG.warn("Server " + sn + " is not currently online. " +
632                "Removing from draining list anyway, as requested.");
633     }
634     // Remove the server from the draining servers lists.
635     return this.drainingServers.remove(sn);
636   }
637 
638   /*
639    * Add the server to the drain list.
640    */
641   public boolean addServerToDrainList(final ServerName sn) {
642     // Warn if the server (sn) is not online.  ServerName is of the form:
643     // <hostname> , <port> , <startcode>
644 
645     if (!this.isServerOnline(sn)) {
646       LOG.warn("Server " + sn + " is not currently online. " +
647                "Ignoring request to add it to draining list.");
648       return false;
649     }
650     // Add the server to the draining servers lists, if it's not already in
651     // it.
652     if (this.drainingServers.contains(sn)) {
653       LOG.warn("Server " + sn + " is already in the draining server list." +
654                "Ignoring request to add it again.");
655       return false;
656     }
657     return this.drainingServers.add(sn);
658   }
659 
660   // RPC methods to region servers
661 
662   /**
663    * Sends an OPEN RPC to the specified server to open the specified region.
664    * <p>
665    * Open should not fail but can if server just crashed.
666    * <p>
667    * @param server server to open a region
668    * @param region region to open
669    * @param versionOfOfflineNode that needs to be present in the offline node
670    * when RS tries to change the state from OFFLINE to other states.
671    * @param favoredNodes
672    */
673   public RegionOpeningState sendRegionOpen(final ServerName server,
674       HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes)
675   throws IOException {
676     AdminService.BlockingInterface admin = getRsAdmin(server);
677     if (admin == null) {
678       LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
679         " failed because no RPC connection found to this server");
680       return RegionOpeningState.FAILED_OPENING;
681     }
682     OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
683       region, versionOfOfflineNode, favoredNodes,
684       (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
685     try {
686       OpenRegionResponse response = admin.openRegion(null, request);
687       return ResponseConverter.getRegionOpeningState(response);
688     } catch (ServiceException se) {
689       throw ProtobufUtil.getRemoteException(se);
690     }
691   }
692 
693   /**
694    * Sends an OPEN RPC to the specified server to open the specified region.
695    * <p>
696    * Open should not fail but can if server just crashed.
697    * <p>
698    * @param server server to open a region
699    * @param regionOpenInfos info of a list of regions to open
700    * @return a list of region opening states
701    */
702   public List<RegionOpeningState> sendRegionOpen(ServerName server,
703       List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos)
704   throws IOException {
705     AdminService.BlockingInterface admin = getRsAdmin(server);
706     if (admin == null) {
707       LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
708         " failed because no RPC connection found to this server");
709       return null;
710     }
711 
712     OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
713       (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
714     try {
715       OpenRegionResponse response = admin.openRegion(null, request);
716       return ResponseConverter.getRegionOpeningStateList(response);
717     } catch (ServiceException se) {
718       throw ProtobufUtil.getRemoteException(se);
719     }
720   }
721 
722   /**
723    * Sends an CLOSE RPC to the specified server to close the specified region.
724    * <p>
725    * A region server could reject the close request because it either does not
726    * have the specified region or the region is being split.
727    * @param server server to open a region
728    * @param region region to open
729    * @param versionOfClosingNode
730    *   the version of znode to compare when RS transitions the znode from
731    *   CLOSING state.
732    * @param dest - if the region is moved to another server, the destination server. null otherwise.
733    * @return true if server acknowledged close, false if not
734    * @throws IOException
735    */
736   public boolean sendRegionClose(ServerName server, HRegionInfo region,
737     int versionOfClosingNode, ServerName dest, boolean transitionInZK) throws IOException {
738     if (server == null) throw new NullPointerException("Passed server is null");
739     AdminService.BlockingInterface admin = getRsAdmin(server);
740     if (admin == null) {
741       throw new IOException("Attempting to send CLOSE RPC to server " +
742         server.toString() + " for region " +
743         region.getRegionNameAsString() +
744         " failed because no RPC connection found to this server");
745     }
746     return ProtobufUtil.closeRegion(admin, server, region.getRegionName(),
747       versionOfClosingNode, dest, transitionInZK);
748   }
749 
750   public boolean sendRegionClose(ServerName server,
751       HRegionInfo region, int versionOfClosingNode) throws IOException {
752     return sendRegionClose(server, region, versionOfClosingNode, null, true);
753   }
754 
755   /**
756    * Sends an MERGE REGIONS RPC to the specified server to merge the specified
757    * regions.
758    * <p>
759    * A region server could reject the close request because it either does not
760    * have the specified region.
761    * @param server server to merge regions
762    * @param region_a region to merge
763    * @param region_b region to merge
764    * @param forcible true if do a compulsory merge, otherwise we will only merge
765    *          two adjacent regions
766    * @throws IOException
767    */
768   public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
769       HRegionInfo region_b, boolean forcible) throws IOException {
770     if (server == null)
771       throw new NullPointerException("Passed server is null");
772     if (region_a == null || region_b == null)
773       throw new NullPointerException("Passed region is null");
774     AdminService.BlockingInterface admin = getRsAdmin(server);
775     if (admin == null) {
776       throw new IOException("Attempting to send MERGE REGIONS RPC to server "
777           + server.toString() + " for region "
778           + region_a.getRegionNameAsString() + ","
779           + region_b.getRegionNameAsString()
780           + " failed because no RPC connection found to this server");
781     }
782     ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible);
783   }
784 
785   /**
786    * Check if a region server is reachable and has the expected start code
787    */
788   public boolean isServerReachable(ServerName server) {
789     if (server == null) throw new NullPointerException("Passed server is null");
790 
791     RetryCounter retryCounter = pingRetryCounterFactory.create();
792     while (retryCounter.shouldRetry()) {
793       synchronized (this.onlineServers) {
794         if (this.deadservers.isDeadServer(server)) {
795           return false;
796         }
797       }
798       try {
799         AdminService.BlockingInterface admin = getRsAdmin(server);
800         if (admin != null) {
801           ServerInfo info = ProtobufUtil.getServerInfo(admin);
802           return info != null && info.hasServerName()
803             && server.getStartcode() == info.getServerName().getStartCode();
804         }
805       } catch (RegionServerStoppedException e) {
806         if (LOG.isDebugEnabled()) {
807           LOG.debug("Couldn't reach " + server, e);
808         }
809         break;
810       } catch (ServerNotRunningYetException e) {
811         if (LOG.isDebugEnabled()) {
812           LOG.debug("Couldn't reach " + server, e);
813         }
814         break;
815       } catch (IOException ioe) {
816         if (LOG.isDebugEnabled()) {
817           LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes() + " of "
818               + retryCounter.getMaxAttempts(), ioe);
819         }
820         try {
821           retryCounter.sleepUntilNextRetry();
822         } catch(InterruptedException ie) {
823           Thread.currentThread().interrupt();
824           break;
825         }
826       }
827     }
828     return false;
829   }
830 
831     /**
832     * @param sn
833     * @return Admin interface for the remote regionserver named <code>sn</code>
834     * @throws IOException
835     * @throws RetriesExhaustedException wrapping a ConnectException if failed
836     */
837   private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
838   throws IOException {
839     AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
840     if (admin == null) {
841       LOG.debug("New admin connection to " + sn.toString());
842       admin = this.connection.getAdmin(sn);
843       this.rsAdmins.put(sn, admin);
844     }
845     return admin;
846   }
847 
848   /**
849    * Wait for the region servers to report in.
850    * We will wait until one of this condition is met:
851    *  - the master is stopped
852    *  - the 'hbase.master.wait.on.regionservers.maxtostart' number of
853    *    region servers is reached
854    *  - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND
855    *   there have been no new region server in for
856    *      'hbase.master.wait.on.regionservers.interval' time AND
857    *   the 'hbase.master.wait.on.regionservers.timeout' is reached
858    *
859    * @throws InterruptedException
860    */
861   public void waitForRegionServers(MonitoredTask status)
862   throws InterruptedException {
863     final long interval = this.master.getConfiguration().
864       getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
865     final long timeout = this.master.getConfiguration().
866       getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
867     int minToStart = this.master.getConfiguration().
868       getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
869     if (minToStart < 1) {
870       LOG.warn(String.format(
871         "The value of '%s' (%d) can not be less than 1, ignoring.",
872         WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
873       minToStart = 1;
874     }
875     int maxToStart = this.master.getConfiguration().
876       getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
877     if (maxToStart < minToStart) {
878         LOG.warn(String.format(
879             "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
880             WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
881             WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
882         maxToStart = Integer.MAX_VALUE;
883     }
884 
885     long now =  System.currentTimeMillis();
886     final long startTime = now;
887     long slept = 0;
888     long lastLogTime = 0;
889     long lastCountChange = startTime;
890     int count = countOfRegionServers();
891     int oldCount = 0;
892     while (
893       !this.master.isStopped() &&
894         count < maxToStart &&
895         (lastCountChange+interval > now || timeout > slept || count < minToStart)
896       ){
897 
898       // Log some info at every interval time or if there is a change
899       if (oldCount != count || lastLogTime+interval < now){
900         lastLogTime = now;
901         String msg =
902           "Waiting for region servers count to settle; currently"+
903             " checked in " + count + ", slept for " + slept + " ms," +
904             " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
905             ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
906         LOG.info(msg);
907         status.setStatus(msg);
908       }
909 
910       // We sleep for some time
911       final long sleepTime = 50;
912       Thread.sleep(sleepTime);
913       now =  System.currentTimeMillis();
914       slept = now - startTime;
915 
916       oldCount = count;
917       count = countOfRegionServers();
918       if (count != oldCount) {
919         lastCountChange = now;
920       }
921     }
922 
923     LOG.info("Finished waiting for region servers count to settle;" +
924       " checked in " + count + ", slept for " + slept + " ms," +
925       " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
926       " master is "+ (this.master.isStopped() ? "stopped.": "running.")
927     );
928   }
929 
930   /**
931    * @return A copy of the internal list of online servers.
932    */
933   public List<ServerName> getOnlineServersList() {
934     // TODO: optimize the load balancer call so we don't need to make a new list
935     // TODO: FIX. THIS IS POPULAR CALL.
936     return new ArrayList<ServerName>(this.onlineServers.keySet());
937   }
938 
939   /**
940    * @return A copy of the internal list of draining servers.
941    */
942   public List<ServerName> getDrainingServersList() {
943     return new ArrayList<ServerName>(this.drainingServers);
944   }
945 
946   /**
947    * @return A copy of the internal set of deadNotExpired servers.
948    */
949   Set<ServerName> getDeadNotExpiredServers() {
950     return new HashSet<ServerName>(this.queuedDeadServers);
951   }
952 
953   /**
954    * During startup, if we figure it is not a failover, i.e. there is
955    * no more HLog files to split, we won't try to recover these dead servers.
956    * So we just remove them from the queue. Use caution in calling this.
957    */
958   void removeRequeuedDeadServers() {
959     requeuedDeadServers.clear();
960   }
961 
962   /**
963    * @return A copy of the internal map of requeuedDeadServers servers and their corresponding
964    *         splitlog need flag.
965    */
966   Map<ServerName, Boolean> getRequeuedDeadServers() {
967     return Collections.unmodifiableMap(this.requeuedDeadServers);
968   }
969 
970   public boolean isServerOnline(ServerName serverName) {
971     return serverName != null && onlineServers.containsKey(serverName);
972   }
973 
974   /**
975    * Check if a server is known to be dead.  A server can be online,
976    * or known to be dead, or unknown to this manager (i.e, not online,
977    * not known to be dead either. it is simply not tracked by the
978    * master any more, for example, a very old previous instance).
979    */
980   public synchronized boolean isServerDead(ServerName serverName) {
981     return serverName == null || deadservers.isDeadServer(serverName)
982       || queuedDeadServers.contains(serverName)
983       || requeuedDeadServers.containsKey(serverName);
984   }
985 
986   public void shutdownCluster() {
987     this.clusterShutdown = true;
988     this.master.stop("Cluster shutdown requested");
989   }
990 
991   public boolean isClusterShutdown() {
992     return this.clusterShutdown;
993   }
994 
995   /**
996    * Stop the ServerManager.  Currently closes the connection to the master.
997    */
998   public void stop() {
999     if (connection != null) {
1000       try {
1001         connection.close();
1002       } catch (IOException e) {
1003         LOG.error("Attempt to close connection to master failed", e);
1004       }
1005     }
1006   }
1007 
1008   /**
1009    * Creates a list of possible destinations for a region. It contains the online servers, but not
1010    *  the draining or dying servers.
1011    *  @param serverToExclude can be null if there is no server to exclude
1012    */
1013   public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
1014     final List<ServerName> destServers = getOnlineServersList();
1015 
1016     if (serverToExclude != null){
1017       destServers.remove(serverToExclude);
1018     }
1019 
1020     // Loop through the draining server list and remove them from the server list
1021     final List<ServerName> drainingServersCopy = getDrainingServersList();
1022     if (!drainingServersCopy.isEmpty()) {
1023       for (final ServerName server: drainingServersCopy) {
1024         destServers.remove(server);
1025       }
1026     }
1027 
1028     // Remove the deadNotExpired servers from the server list.
1029     removeDeadNotExpiredServers(destServers);
1030 
1031     return destServers;
1032   }
1033 
1034   /**
1035    * Calls {@link #createDestinationServersList} without server to exclude.
1036    */
1037   public List<ServerName> createDestinationServersList(){
1038     return createDestinationServersList(null);
1039   }
1040 
1041     /**
1042     * Loop through the deadNotExpired server list and remove them from the
1043     * servers.
1044     * This function should be used carefully outside of this class. You should use a high level
1045     *  method such as {@link #createDestinationServersList()} instead of managing you own list.
1046     */
1047   void removeDeadNotExpiredServers(List<ServerName> servers) {
1048     Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1049     if (!deadNotExpiredServersCopy.isEmpty()) {
1050       for (ServerName server : deadNotExpiredServersCopy) {
1051         LOG.debug("Removing dead but not expired server: " + server
1052           + " from eligible server pool.");
1053         servers.remove(server);
1054       }
1055     }
1056   }
1057 
1058   /**
1059    * To clear any dead server with same host name and port of any online server
1060    */
1061   void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1062     for (ServerName serverName : getOnlineServersList()) {
1063       deadservers.cleanAllPreviousInstances(serverName);
1064     }
1065   }
1066 }