View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.NavigableMap;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentSkipListSet;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.ThreadFactory;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.Chore;
51  import org.apache.hadoop.hbase.HBaseIOException;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionInfo;
54  import org.apache.hadoop.hbase.NotServingRegionException;
55  import org.apache.hadoop.hbase.RegionTransition;
56  import org.apache.hadoop.hbase.Server;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.Stoppable;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.TableNotFoundException;
61  import org.apache.hadoop.hbase.catalog.CatalogTracker;
62  import org.apache.hadoop.hbase.catalog.MetaReader;
63  import org.apache.hadoop.hbase.client.Result;
64  import org.apache.hadoop.hbase.exceptions.DeserializationException;
65  import org.apache.hadoop.hbase.executor.EventHandler;
66  import org.apache.hadoop.hbase.executor.EventType;
67  import org.apache.hadoop.hbase.executor.ExecutorService;
68  import org.apache.hadoop.hbase.ipc.RpcClient;
69  import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
70  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
71  import org.apache.hadoop.hbase.master.RegionState.State;
72  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
73  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
74  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
75  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
76  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
77  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
78  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
79  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
80  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
81  import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
82  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
83  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
84  import org.apache.hadoop.hbase.regionserver.SplitTransaction;
85  import org.apache.hadoop.hbase.regionserver.wal.HLog;
86  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
87  import org.apache.hadoop.hbase.util.ConfigUtil;
88  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
89  import org.apache.hadoop.hbase.util.FSUtils;
90  import org.apache.hadoop.hbase.util.KeyLocker;
91  import org.apache.hadoop.hbase.util.Pair;
92  import org.apache.hadoop.hbase.util.PairOfSameType;
93  import org.apache.hadoop.hbase.util.Threads;
94  import org.apache.hadoop.hbase.util.Triple;
95  import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
96  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
97  import org.apache.hadoop.hbase.zookeeper.ZKTable;
98  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
99  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
100 import org.apache.hadoop.ipc.RemoteException;
101 import org.apache.zookeeper.AsyncCallback;
102 import org.apache.zookeeper.KeeperException;
103 import org.apache.zookeeper.KeeperException.NoNodeException;
104 import org.apache.zookeeper.KeeperException.NodeExistsException;
105 import org.apache.zookeeper.data.Stat;
106 
107 import com.google.common.annotations.VisibleForTesting;
108 import com.google.common.base.Preconditions;
109 import com.google.common.collect.LinkedHashMultimap;
110 
111 /**
112  * Manages and performs region assignment.
113  * <p>
114  * Monitors ZooKeeper for events related to regions in transition.
115  * <p>
116  * Handles existing regions in transition during master failover.
117  */
118 @InterfaceAudience.Private
119 public class AssignmentManager extends ZooKeeperListener {
120   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
121 
122   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
123       -1, -1L);
124 
125   public static final String ASSIGNMENT_TIMEOUT = "hbase.master.assignment.timeoutmonitor.timeout";
126   public static final int DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT = 600000;
127   public static final String ASSIGNMENT_TIMEOUT_MANAGEMENT = "hbase.assignment.timeout.management";
128   public static final boolean DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT = false;
129 
130   public static final String ALREADY_IN_TRANSITION_WAITTIME
131     = "hbase.assignment.already.intransition.waittime";
132   public static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
133 
134   protected final Server server;
135 
136   private ServerManager serverManager;
137 
138   private boolean shouldAssignRegionsWithFavoredNodes;
139 
140   private CatalogTracker catalogTracker;
141 
142   protected final TimeoutMonitor timeoutMonitor;
143 
144   private final TimerUpdater timerUpdater;
145 
146   private LoadBalancer balancer;
147 
148   private final MetricsAssignmentManager metricsAssignmentManager;
149 
150   private final TableLockManager tableLockManager;
151 
152   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
153 
154   final private KeyLocker<String> locker = new KeyLocker<String>();
155 
156   /**
157    * Map of regions to reopen after the schema of a table is changed. Key -
158    * encoded region name, value - HRegionInfo
159    */
160   private final Map <String, HRegionInfo> regionsToReopen;
161 
162   /*
163    * Maximum times we recurse an assignment/unassignment.
164    * See below in {@link #assign()} and {@link #unassign()}.
165    */
166   private final int maximumAttempts;
167 
168   /**
169    * Map of two merging regions from the region to be created.
170    */
171   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
172     = new HashMap<String, PairOfSameType<HRegionInfo>>();
173 
174   private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
175   = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
176 
177   /**
178    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
179    * failure due to lack of availability of region plan or bad region plan
180    */
181   private final long sleepTimeBeforeRetryingMetaAssignment;
182 
183   /** Plans for region movement. Key is the encoded version of a region name*/
184   // TODO: When do plans get cleaned out?  Ever? In server open and in server
185   // shutdown processing -- St.Ack
186   // All access to this Map must be synchronized.
187   final NavigableMap<String, RegionPlan> regionPlans =
188     new TreeMap<String, RegionPlan>();
189 
190   private final ZKTable zkTable;
191 
192   /**
193    * Contains the server which need to update timer, these servers will be
194    * handled by {@link TimerUpdater}
195    */
196   private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer;
197 
198   private final ExecutorService executorService;
199 
200   // For unit tests, keep track of calls to ClosedRegionHandler
201   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
202 
203   // For unit tests, keep track of calls to OpenedRegionHandler
204   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
205 
206   //Thread pool executor service for timeout monitor
207   private java.util.concurrent.ExecutorService threadPoolExecutorService;
208 
209   // A bunch of ZK events workers. Each is a single thread executor service
210   private final java.util.concurrent.ExecutorService zkEventWorkers;
211 
212   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
213       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
214 
215   private final RegionStates regionStates;
216 
217   // The threshold to use bulk assigning. Using bulk assignment
218   // only if assigning at least this many regions to at least this
219   // many servers. If assigning fewer regions to fewer servers,
220   // bulk assigning may be not as efficient.
221   private final int bulkAssignThresholdRegions;
222   private final int bulkAssignThresholdServers;
223 
224   // Should bulk assignment wait till all regions are assigned,
225   // or it is timed out?  This is useful to measure bulk assignment
226   // performance, but not needed in most use cases.
227   private final boolean bulkAssignWaitTillAllAssigned;
228 
229   /**
230    * Indicator that AssignmentManager has recovered the region states so
231    * that ServerShutdownHandler can be fully enabled and re-assign regions
232    * of dead servers. So that when re-assignment happens, AssignmentManager
233    * has proper region states.
234    *
235    * Protected to ease testing.
236    */
237   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
238 
239   /** Is the TimeOutManagement activated **/
240   private final boolean tomActivated;
241 
242   /**
243    * A map to track the count a region fails to open in a row.
244    * So that we don't try to open a region forever if the failure is
245    * unrecoverable.  We don't put this information in region states
246    * because we don't expect this to happen frequently; we don't
247    * want to copy this information over during each state transition either.
248    */
249   private final ConcurrentHashMap<String, AtomicInteger>
250     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
251 
252   // A flag to indicate if we are using ZK for region assignment
253   private final boolean useZKForAssignment;
254 
255   // In case not using ZK for region assignment, region states
256   // are persisted in meta with a state store
257   private final RegionStateStore regionStateStore;
258 
259   /**
260    * For testing only!  Set to true to skip handling of split.
261    */
262   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
263   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
264 
265   /** Listeners that are called on assignment events. */
266   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
267 
268   /**
269    * Constructs a new assignment manager.
270    *
271    * @param server
272    * @param serverManager
273    * @param catalogTracker
274    * @param service
275    * @throws KeeperException
276    * @throws IOException
277    */
278   public AssignmentManager(Server server, ServerManager serverManager,
279       CatalogTracker catalogTracker, final LoadBalancer balancer,
280       final ExecutorService service, MetricsMaster metricsMaster,
281       final TableLockManager tableLockManager) throws KeeperException, IOException {
282     super(server.getZooKeeper());
283     this.server = server;
284     this.serverManager = serverManager;
285     this.catalogTracker = catalogTracker;
286     this.executorService = service;
287     this.regionStateStore = new RegionStateStore(server);
288     this.regionsToReopen = Collections.synchronizedMap
289                            (new HashMap<String, HRegionInfo> ());
290     Configuration conf = server.getConfiguration();
291     // Only read favored nodes if using the favored nodes load balancer.
292     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
293            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
294            FavoredNodeLoadBalancer.class);
295     this.tomActivated = conf.getBoolean(
296       ASSIGNMENT_TIMEOUT_MANAGEMENT, DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
297     if (tomActivated){
298       this.serversInUpdatingTimer =  new ConcurrentSkipListSet<ServerName>();
299       this.timeoutMonitor = new TimeoutMonitor(
300         conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
301         server, serverManager,
302         conf.getInt(ASSIGNMENT_TIMEOUT, DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT));
303       this.timerUpdater = new TimerUpdater(conf.getInt(
304         "hbase.master.assignment.timerupdater.period", 10000), server);
305       Threads.setDaemonThreadRunning(timerUpdater.getThread(),
306         server.getServerName() + ".timerUpdater");
307     } else {
308       this.serversInUpdatingTimer =  null;
309       this.timeoutMonitor = null;
310       this.timerUpdater = null;
311     }
312     this.zkTable = new ZKTable(this.watcher);
313     // This is the max attempts, not retries, so it should be at least 1.
314     this.maximumAttempts = Math.max(1,
315       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
316     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
317         "hbase.meta.assignment.retry.sleeptime", 1000l);
318     this.balancer = balancer;
319     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
320     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
321       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
322     this.regionStates = new RegionStates(server, serverManager, regionStateStore);
323 
324     this.bulkAssignWaitTillAllAssigned =
325       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
326     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
327     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
328 
329     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
330     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
331     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
332             TimeUnit.SECONDS, threadFactory);
333     this.tableLockManager = tableLockManager;
334 
335     this.metricsAssignmentManager = new MetricsAssignmentManager();
336     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
337   }
338 
339   void startTimeOutMonitor() {
340     if (tomActivated) {
341       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
342           + ".timeoutMonitor");
343     }
344   }
345 
346   /**
347    * Add the listener to the notification list.
348    * @param listener The AssignmentListener to register
349    */
350   public void registerListener(final AssignmentListener listener) {
351     this.listeners.add(listener);
352   }
353 
354   /**
355    * Remove the listener from the notification list.
356    * @param listener The AssignmentListener to unregister
357    */
358   public boolean unregisterListener(final AssignmentListener listener) {
359     return this.listeners.remove(listener);
360   }
361 
362   /**
363    * @return Instance of ZKTable.
364    */
365   public ZKTable getZKTable() {
366     // These are 'expensive' to make involving trip to zk ensemble so allow
367     // sharing.
368     return this.zkTable;
369   }
370 
371   /**
372    * This SHOULD not be public. It is public now
373    * because of some unit tests.
374    *
375    * TODO: make it package private and keep RegionStates in the master package
376    */
377   public RegionStates getRegionStates() {
378     return regionStates;
379   }
380 
381   /**
382    * Used in some tests to mock up region state in meta
383    */
384   @VisibleForTesting
385   RegionStateStore getRegionStateStore() {
386     return regionStateStore;
387   }
388 
389   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
390     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
391   }
392 
393   /**
394    * Add a regionPlan for the specified region.
395    * @param encodedName
396    * @param plan
397    */
398   public void addPlan(String encodedName, RegionPlan plan) {
399     synchronized (regionPlans) {
400       regionPlans.put(encodedName, plan);
401     }
402   }
403 
404   /**
405    * Add a map of region plans.
406    */
407   public void addPlans(Map<String, RegionPlan> plans) {
408     synchronized (regionPlans) {
409       regionPlans.putAll(plans);
410     }
411   }
412 
413   /**
414    * Set the list of regions that will be reopened
415    * because of an update in table schema
416    *
417    * @param regions
418    *          list of regions that should be tracked for reopen
419    */
420   public void setRegionsToReopen(List <HRegionInfo> regions) {
421     for(HRegionInfo hri : regions) {
422       regionsToReopen.put(hri.getEncodedName(), hri);
423     }
424   }
425 
426   /**
427    * Used by the client to identify if all regions have the schema updates
428    *
429    * @param tableName
430    * @return Pair indicating the status of the alter command
431    * @throws IOException
432    */
433   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
434       throws IOException {
435     List <HRegionInfo> hris =
436       MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true);
437     Integer pending = 0;
438     for (HRegionInfo hri : hris) {
439       String name = hri.getEncodedName();
440       // no lock concurrent access ok: sequential consistency respected.
441       if (regionsToReopen.containsKey(name)
442           || regionStates.isRegionInTransition(name)) {
443         pending++;
444       }
445     }
446     return new Pair<Integer, Integer>(pending, hris.size());
447   }
448 
449   /**
450    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
451    * the failover cleanup before re-assigning regions of dead servers. So that
452    * when re-assignment happens, AssignmentManager has proper region states.
453    */
454   public boolean isFailoverCleanupDone() {
455     return failoverCleanupDone.get();
456   }
457 
458   /**
459    * To avoid racing with AM, external entities may need to lock a region,
460    * for example, when SSH checks what regions to skip re-assigning.
461    */
462   public Lock acquireRegionLock(final String encodedName) {
463     return locker.acquireLock(encodedName);
464   }
465 
466   /**
467    * Now, failover cleanup is completed. Notify server manager to
468    * process queued up dead servers processing, if any.
469    */
470   void failoverCleanupDone() {
471     failoverCleanupDone.set(true);
472     serverManager.processQueuedDeadServers();
473   }
474 
475   /**
476    * Called on startup.
477    * Figures whether a fresh cluster start of we are joining extant running cluster.
478    * @throws IOException
479    * @throws KeeperException
480    * @throws InterruptedException
481    */
482   void joinCluster() throws IOException,
483       KeeperException, InterruptedException {
484     long startTime = System.currentTimeMillis();
485     // Concurrency note: In the below the accesses on regionsInTransition are
486     // outside of a synchronization block where usually all accesses to RIT are
487     // synchronized.  The presumption is that in this case it is safe since this
488     // method is being played by a single thread on startup.
489 
490     // TODO: Regions that have a null location and are not in regionsInTransitions
491     // need to be handled.
492 
493     // Scan hbase:meta to build list of existing regions, servers, and assignment
494     // Returns servers who have not checked in (assumed dead) and their regions
495     Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
496 
497     // This method will assign all user regions if a clean server startup or
498     // it will reconstruct master state and cleanup any leftovers from
499     // previous master process.
500     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
501 
502     if (!useZKForAssignment) {
503       // Not use ZK for assignment any more, remove the ZNode
504       ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
505     }
506     recoverTableInDisablingState();
507     recoverTableInEnablingState();
508     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
509       - startTime) + "ms, failover=" + failover);
510   }
511 
512   /**
513    * Process all regions that are in transition in zookeeper and also
514    * processes the list of dead servers by scanning the META.
515    * Used by master joining an cluster.  If we figure this is a clean cluster
516    * startup, will assign all user regions.
517    * @param deadServers
518    *          Map of dead servers and their regions. Can be null.
519    * @throws KeeperException
520    * @throws IOException
521    * @throws InterruptedException
522    */
523   boolean processDeadServersAndRegionsInTransition(
524       final Map<ServerName, List<HRegionInfo>> deadServers)
525           throws KeeperException, IOException, InterruptedException {
526     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
527       watcher.assignmentZNode);
528 
529     if (nodes == null && useZKForAssignment) {
530       String errorMessage = "Failed to get the children from ZK";
531       server.abort(errorMessage, new IOException(errorMessage));
532       return true; // Doesn't matter in this case
533     }
534 
535     boolean failover = !serverManager.getDeadServers().isEmpty();
536     if (failover) {
537       // This may not be a failover actually, especially if meta is on this master.
538       if (LOG.isDebugEnabled()) {
539         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
540       }
541     } else {
542       // If any one region except meta is assigned, it's a failover.
543       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
544       for (Map.Entry<HRegionInfo, ServerName> en : regionStates.getRegionAssignments().entrySet()) {
545         HRegionInfo hri = en.getKey();
546         if (!hri.isMetaTable() && onlineServers.contains(en.getValue())) {
547           LOG.debug("Found " + hri + " out on cluster");
548           failover = true;
549           break;
550         }
551       }
552     }
553 
554     if (!failover && nodes != null) {
555       // If any one region except meta is in transition, it's a failover.
556       for (String encodedName : nodes) {
557         RegionState regionState = regionStates.getRegionState(encodedName);
558         if (regionState != null && !regionState.getRegion().isMetaRegion()) {
559           LOG.debug("Found " + regionState + " in RITs");
560           failover = true;
561           break;
562         }
563       }
564     }
565 
566     if (!failover && !useZKForAssignment) {
567       // If any region except meta is in transition on a live server, it's a failover.
568       Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
569       if (!regionsInTransition.isEmpty()) {
570         Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
571         for (RegionState regionState : regionsInTransition.values()) {
572           ServerName serverName = regionState.getServerName();
573           if (!regionState.getRegion().isMetaRegion()
574               && serverName != null && onlineServers.contains(serverName)) {
575             LOG.debug("Found " + regionState + " in RITs");
576             failover = true;
577             break;
578           }
579         }
580       }
581     }
582 
583     if (!failover) {
584       // If we get here, we have a full cluster restart. It is a failover only
585       // if there are some HLogs are not split yet. For meta HLogs, they should have
586       // been split already, if any. We can walk through those queued dead servers,
587       // if they don't have any HLogs, this restart should be considered as a clean one
588       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
589       if (!queuedDeadServers.isEmpty()) {
590         Configuration conf = server.getConfiguration();
591         Path rootdir = FSUtils.getRootDir(conf);
592         FileSystem fs = rootdir.getFileSystem(conf);
593         for (ServerName serverName : queuedDeadServers) {
594           Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
595           Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
596           if (fs.exists(logDir) || fs.exists(splitDir)) {
597             LOG.debug("Found queued dead server " + serverName);
598             failover = true;
599             break;
600           }
601         }
602         if (!failover) {
603           // We figured that it's not a failover, so no need to
604           // work on these re-queued dead servers any more.
605           LOG.info("AM figured that it's not a failover and cleaned up " + queuedDeadServers.size()
606               + " queued dead servers");
607           serverManager.removeRequeuedDeadServers();
608         }
609       }
610     }
611 
612     Set<TableName> disabledOrDisablingOrEnabling = null;
613     if (!failover) {
614       disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
615       disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
616       // Clean re/start, mark all user regions closed before reassignment
617       // TODO -Hbase-11319
618       regionStates.closeAllUserRegions(disabledOrDisablingOrEnabling);
619     }
620 
621     // Now region states are restored
622     regionStateStore.start();
623 
624     // If we found user regions out on cluster, its a failover.
625     if (failover) {
626       LOG.info("Found regions out on cluster or in RIT; presuming failover");
627       // Process list of dead servers and regions in RIT.
628       // See HBASE-4580 for more information.
629       processDeadServersAndRecoverLostRegions(deadServers);
630     } 
631     if (!failover && useZKForAssignment) {
632       // Cleanup any existing ZK nodes and start watching
633       ZKAssign.deleteAllNodes(watcher);
634       ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, this.watcher.assignmentZNode);
635     }
636     // Now we can safely claim failover cleanup completed and enable
637     // ServerShutdownHandler for further processing. The nodes (below)
638     // in transition, if any, are for regions not related to those
639     // dead servers at all, and can be done in parallel to SSH.
640     failoverCleanupDone();
641     if (!failover) {
642       // Fresh cluster startup.
643       LOG.info("Clean cluster startup. Assigning user regions");
644       assignAllUserRegions(disabledOrDisablingOrEnabling);
645     }
646     return failover;
647   }
648 
649   /**
650    * If region is up in zk in transition, then do fixup and block and wait until
651    * the region is assigned and out of transition.  Used on startup for
652    * catalog regions.
653    * @param hri Region to look for.
654    * @return True if we processed a region in transition else false if region
655    * was not up in zk in transition.
656    * @throws InterruptedException
657    * @throws KeeperException
658    * @throws IOException
659    */
660   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
661       throws InterruptedException, KeeperException, IOException {
662     String encodedRegionName = hri.getEncodedName();
663     if (!processRegionInTransition(encodedRegionName, hri)) {
664       return false; // The region is not in transition
665     }
666     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
667     while (!this.server.isStopped() &&
668         this.regionStates.isRegionInTransition(encodedRegionName)) {
669       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
670       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
671         // The region is not in transition, or not in transition on an online
672         // server. Doesn't help to block here any more. Caller need to
673         // verify the region is actually assigned.
674         break;
675       }
676       this.regionStates.waitForUpdate(100);
677     }
678     return true;
679   }
680 
681   /**
682    * Process failover of new master for region <code>encodedRegionName</code>
683    * up in zookeeper.
684    * @param encodedRegionName Region to process failover for.
685    * @param regionInfo If null we'll go get it from meta table.
686    * @return True if we processed <code>regionInfo</code> as a RIT.
687    * @throws KeeperException
688    * @throws IOException
689    */
690   boolean processRegionInTransition(final String encodedRegionName,
691       final HRegionInfo regionInfo) throws KeeperException, IOException {
692     // We need a lock here to ensure that we will not put the same region twice
693     // It has no reason to be a lock shared with the other operations.
694     // We can do the lock on the region only, instead of a global lock: what we want to ensure
695     // is that we don't have two threads working on the same region.
696     Lock lock = locker.acquireLock(encodedRegionName);
697     try {
698       Stat stat = new Stat();
699       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
700       if (data == null) return false;
701       RegionTransition rt;
702       try {
703         rt = RegionTransition.parseFrom(data);
704       } catch (DeserializationException e) {
705         LOG.warn("Failed parse znode data", e);
706         return false;
707       }
708       HRegionInfo hri = regionInfo;
709       if (hri == null) {
710         // The region info is not passed in. We will try to find the region
711         // from region states map/meta based on the encoded region name. But we
712         // may not be able to find it. This is valid for online merge that
713         // the region may have not been created if the merge is not completed.
714         // Therefore, it is not in meta at master recovery time.
715         hri = regionStates.getRegionInfo(rt.getRegionName());
716         EventType et = rt.getEventType();
717         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
718             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
719           LOG.warn("Couldn't find the region in recovering " + rt);
720           return false;
721         }
722       }
723       return processRegionsInTransition(
724         rt, hri, stat.getVersion());
725     } finally {
726       lock.unlock();
727     }
728   }
729 
730   /**
731    * This call is invoked only (1) master assign meta;
732    * (2) during failover mode startup, zk assignment node processing.
733    * The locker is set in the caller. It returns true if the region
734    * is in transition for sure, false otherwise.
735    *
736    * It should be private but it is used by some test too.
737    */
738   boolean processRegionsInTransition(
739       final RegionTransition rt, final HRegionInfo regionInfo,
740       final int expectedVersion) throws KeeperException {
741     EventType et = rt.getEventType();
742     // Get ServerName.  Could not be null.
743     final ServerName sn = rt.getServerName();
744     final byte[] regionName = rt.getRegionName();
745     final String encodedName = HRegionInfo.encodeRegionName(regionName);
746     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
747     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
748 
749     if (regionStates.isRegionInTransition(encodedName)
750         && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
751       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
752         + et + ", does nothing since the region is already in transition "
753         + regionStates.getRegionTransitionState(encodedName));
754       // Just return
755       return true;
756     }
757     if (!serverManager.isServerOnline(sn)) {
758       // It was transitioning on a dead server, so it's closed now.
759       // Force to OFFLINE and put it in transition, but not assign it
760       // since log splitting for the dead server is not done yet.
761       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
762         " was on deadserver; forcing offline");
763       if (regionStates.isRegionOnline(regionInfo)) {
764         // Meta could still show the region is assigned to the previous
765         // server. If that server is online, when we reload the meta, the
766         // region is put back to online, we need to offline it.
767         regionStates.regionOffline(regionInfo);
768         sendRegionClosedNotification(regionInfo);
769       }
770       // Put it back in transition so that SSH can re-assign it
771       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
772 
773       if (regionInfo.isMetaRegion()) {
774         // If it's meta region, reset the meta location.
775         // So that master knows the right meta region server.
776         MetaRegionTracker.setMetaLocation(watcher, sn, State.OPEN);
777       } else {
778         // No matter the previous server is online or offline,
779         // we need to reset the last region server of the region.
780         regionStates.setLastRegionServerOfRegion(sn, encodedName);
781         // Make sure we know the server is dead.
782         if (!serverManager.isServerDead(sn)) {
783           serverManager.expireServer(sn);
784         }
785       }
786       return false;
787     }
788     switch (et) {
789       case M_ZK_REGION_CLOSING:
790         // Insert into RIT & resend the query to the region server: may be the previous master
791         // died before sending the query the first time.
792         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
793         this.executorService.submit(
794           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
795             @Override
796             public void process() throws IOException {
797               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
798               try {
799                 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
800                 if (regionStates.isRegionOffline(regionInfo)) {
801                   assign(regionInfo, true);
802                 }
803               } finally {
804                 lock.unlock();
805               }
806             }
807           });
808         break;
809 
810       case RS_ZK_REGION_CLOSED:
811       case RS_ZK_REGION_FAILED_OPEN:
812         // Region is closed, insert into RIT and handle it
813         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
814         invokeAssign(regionInfo);
815         break;
816 
817       case M_ZK_REGION_OFFLINE:
818         // Insert in RIT and resend to the regionserver
819         regionStates.updateRegionState(rt, State.PENDING_OPEN);
820         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
821         this.executorService.submit(
822           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
823             @Override
824             public void process() throws IOException {
825               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
826               try {
827                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
828                 addPlan(encodedName, plan);
829                 assign(rsOffline, false, false);
830               } finally {
831                 lock.unlock();
832               }
833             }
834           });
835         break;
836 
837       case RS_ZK_REGION_OPENING:
838         regionStates.updateRegionState(rt, State.OPENING);
839         break;
840 
841       case RS_ZK_REGION_OPENED:
842         // Region is opened, insert into RIT and handle it
843         // This could be done asynchronously, we would need then to acquire the lock in the
844         //  handler.
845         regionStates.updateRegionState(rt, State.OPEN);
846         new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
847         break;
848       case RS_ZK_REQUEST_REGION_SPLIT:
849       case RS_ZK_REGION_SPLITTING:
850       case RS_ZK_REGION_SPLIT:
851         // Splitting region should be online. We could have skipped it during
852         // user region rebuilding since we may consider the split is completed.
853         // Put it in SPLITTING state to avoid complications.
854         regionStates.regionOnline(regionInfo, sn);
855         regionStates.updateRegionState(rt, State.SPLITTING);
856         if (!handleRegionSplitting(
857             rt, encodedName, prettyPrintedRegionName, sn)) {
858           deleteSplittingNode(encodedName, sn);
859         }
860         break;
861       case RS_ZK_REQUEST_REGION_MERGE:
862       case RS_ZK_REGION_MERGING:
863       case RS_ZK_REGION_MERGED:
864         if (!handleRegionMerging(
865             rt, encodedName, prettyPrintedRegionName, sn)) {
866           deleteMergingNode(encodedName, sn);
867         }
868         break;
869       default:
870         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
871     }
872     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
873       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
874       + "server: " + sn);
875     return true;
876   }
877 
878   /**
879    * When a region is closed, it should be removed from the regionsToReopen
880    * @param hri HRegionInfo of the region which was closed
881    */
882   public void removeClosedRegion(HRegionInfo hri) {
883     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
884       LOG.debug("Removed region from reopening regions because it was closed");
885     }
886   }
887 
888   /**
889    * Handles various states an unassigned node can be in.
890    * <p>
891    * Method is called when a state change is suspected for an unassigned node.
892    * <p>
893    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
894    * yet).
895    * @param rt
896    * @param expectedVersion
897    */
898   void handleRegion(final RegionTransition rt, int expectedVersion) {
899     if (rt == null) {
900       LOG.warn("Unexpected NULL input for RegionTransition rt");
901       return;
902     }
903     final ServerName sn = rt.getServerName();
904     // Check if this is a special HBCK transition
905     if (sn.equals(HBCK_CODE_SERVERNAME)) {
906       handleHBCK(rt);
907       return;
908     }
909     final long createTime = rt.getCreateTime();
910     final byte[] regionName = rt.getRegionName();
911     String encodedName = HRegionInfo.encodeRegionName(regionName);
912     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
913     // Verify this is a known server
914     if (!serverManager.isServerOnline(sn)
915       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
916       LOG.warn("Attempted to handle region transition for server but " +
917         "it is not online: " + prettyPrintedRegionName + ", " + rt);
918       return;
919     }
920 
921     RegionState regionState =
922       regionStates.getRegionState(encodedName);
923     long startTime = System.currentTimeMillis();
924     if (LOG.isDebugEnabled()) {
925       boolean lateEvent = createTime < (startTime - 15000);
926       LOG.debug("Handling " + rt.getEventType() +
927         ", server=" + sn + ", region=" +
928         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
929         (lateEvent ? ", which is more than 15 seconds late" : "") +
930         ", current_state=" + regionState);
931     }
932     // We don't do anything for this event,
933     // so separate it out, no need to lock/unlock anything
934     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
935       return;
936     }
937 
938     // We need a lock on the region as we could update it
939     Lock lock = locker.acquireLock(encodedName);
940     try {
941       RegionState latestState =
942         regionStates.getRegionState(encodedName);
943       if ((regionState == null && latestState != null)
944           || (regionState != null && latestState == null)
945           || (regionState != null && latestState != null
946             && latestState.getState() != regionState.getState())) {
947         LOG.warn("Region state changed from " + regionState + " to "
948           + latestState + ", while acquiring lock");
949       }
950       long waitedTime = System.currentTimeMillis() - startTime;
951       if (waitedTime > 5000) {
952         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
953       }
954       regionState = latestState;
955       switch (rt.getEventType()) {
956       case RS_ZK_REQUEST_REGION_SPLIT:
957       case RS_ZK_REGION_SPLITTING:
958       case RS_ZK_REGION_SPLIT:
959         if (!handleRegionSplitting(
960             rt, encodedName, prettyPrintedRegionName, sn)) {
961           deleteSplittingNode(encodedName, sn);
962         }
963         break;
964 
965       case RS_ZK_REQUEST_REGION_MERGE:
966       case RS_ZK_REGION_MERGING:
967       case RS_ZK_REGION_MERGED:
968         // Merged region is a new region, we can't find it in the region states now.
969         // However, the two merging regions are not new. They should be in state for merging.
970         if (!handleRegionMerging(
971             rt, encodedName, prettyPrintedRegionName, sn)) {
972           deleteMergingNode(encodedName, sn);
973         }
974         break;
975 
976       case M_ZK_REGION_CLOSING:
977         // Should see CLOSING after we have asked it to CLOSE or additional
978         // times after already being in state of CLOSING
979         if (regionState == null
980             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
981           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
982             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
983             + regionStates.getRegionState(encodedName));
984           return;
985         }
986         // Transition to CLOSING (or update stamp if already CLOSING)
987         regionStates.updateRegionState(rt, State.CLOSING);
988         break;
989 
990       case RS_ZK_REGION_CLOSED:
991         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
992         if (regionState == null
993             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
994           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
995             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
996             + regionStates.getRegionState(encodedName));
997           return;
998         }
999         // Handle CLOSED by assigning elsewhere or stopping if a disable
1000         // If we got here all is good.  Need to update RegionState -- else
1001         // what follows will fail because not in expected state.
1002         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1003         updateClosedRegionHandlerTracker(regionState.getRegion());
1004         break;
1005 
1006         case RS_ZK_REGION_FAILED_OPEN:
1007           if (regionState == null
1008               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1009             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1010               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1011               + regionStates.getRegionState(encodedName));
1012             return;
1013           }
1014           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1015           if (failedOpenCount == null) {
1016             failedOpenCount = new AtomicInteger();
1017             // No need to use putIfAbsent, or extra synchronization since
1018             // this whole handleRegion block is locked on the encoded region
1019             // name, and failedOpenTracker is updated only in this block
1020             failedOpenTracker.put(encodedName, failedOpenCount);
1021           }
1022           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1023             regionStates.updateRegionState(rt, State.FAILED_OPEN);
1024             // remove the tracking info to save memory, also reset
1025             // the count for next open initiative
1026             failedOpenTracker.remove(encodedName);
1027           } else {
1028             // Handle this the same as if it were opened and then closed.
1029             regionState = regionStates.updateRegionState(rt, State.CLOSED);
1030             if (regionState != null) {
1031               // When there are more than one region server a new RS is selected as the
1032               // destination and the same is updated in the regionplan. (HBASE-5546)
1033               try {
1034                 getRegionPlan(regionState.getRegion(), sn, true);
1035                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1036               } catch (HBaseIOException e) {
1037                 LOG.warn("Failed to get region plan", e);
1038               }
1039             }
1040           }
1041           break;
1042 
1043         case RS_ZK_REGION_OPENING:
1044           // Should see OPENING after we have asked it to OPEN or additional
1045           // times after already being in state of OPENING
1046           if (regionState == null
1047               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1048             LOG.warn("Received OPENING for " + prettyPrintedRegionName
1049               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1050               + regionStates.getRegionState(encodedName));
1051             return;
1052           }
1053           // Transition to OPENING (or update stamp if already OPENING)
1054           regionStates.updateRegionState(rt, State.OPENING);
1055           break;
1056 
1057         case RS_ZK_REGION_OPENED:
1058           // Should see OPENED after OPENING but possible after PENDING_OPEN.
1059           if (regionState == null
1060               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1061             LOG.warn("Received OPENED for " + prettyPrintedRegionName
1062               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1063               + regionStates.getRegionState(encodedName));
1064 
1065             if (regionState != null) {
1066               // Close it without updating the internal region states,
1067               // so as not to create double assignments in unlucky scenarios
1068               // mentioned in OpenRegionHandler#process
1069               unassign(regionState.getRegion(), null, -1, null, false, sn);
1070             }
1071             return;
1072           }
1073           // Handle OPENED by removing from transition and deleted zk node
1074           regionState = 
1075               regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1076           if (regionState != null) {
1077             failedOpenTracker.remove(encodedName); // reset the count, if any
1078             new OpenedRegionHandler(
1079               server, this, regionState.getRegion(), sn, expectedVersion).process();
1080             updateOpenedRegionHandlerTracker(regionState.getRegion());
1081           }
1082           break;
1083 
1084         default:
1085           throw new IllegalStateException("Received event is not valid.");
1086       }
1087     } finally {
1088       lock.unlock();
1089     }
1090   }
1091 
1092   //For unit tests only
1093   boolean wasClosedHandlerCalled(HRegionInfo hri) {
1094     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1095     //compareAndSet to be sure that unit tests don't see stale values. Means,
1096     //we will return true exactly once unless the handler code resets to true
1097     //this value.
1098     return b == null ? false : b.compareAndSet(true, false);
1099   }
1100 
1101   //For unit tests only
1102   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1103     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1104     //compareAndSet to be sure that unit tests don't see stale values. Means,
1105     //we will return true exactly once unless the handler code resets to true
1106     //this value.
1107     return b == null ? false : b.compareAndSet(true, false);
1108   }
1109 
1110   //For unit tests only
1111   void initializeHandlerTrackers() {
1112     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1113     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1114   }
1115 
1116   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1117     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1118       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1119     }
1120   }
1121 
1122   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1123     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1124       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1125     }
1126   }
1127 
1128   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1129   // meta could not be contacted/updated. We need to see how seriously to treat
1130   // this problem as. Should we fail the current assignment. We should be able
1131   // to recover from this problem eventually (if the meta couldn't be updated
1132   // things should work normally and eventually get fixed up).
1133   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1134     if (!shouldAssignRegionsWithFavoredNodes) return;
1135     // The AM gets the favored nodes info for each region and updates the meta
1136     // table with that info
1137     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1138         new HashMap<HRegionInfo, List<ServerName>>();
1139     for (HRegionInfo region : regions) {
1140       regionToFavoredNodes.put(region,
1141           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1142     }
1143     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker);
1144   }
1145 
1146   /**
1147    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1148    * <p>
1149    * This is handled in a separate code path because it breaks the normal rules.
1150    * @param rt
1151    */
1152   private void handleHBCK(RegionTransition rt) {
1153     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1154     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1155       ", server=" + rt.getServerName() + ", region=" +
1156       HRegionInfo.prettyPrint(encodedName));
1157     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1158     switch (rt.getEventType()) {
1159       case M_ZK_REGION_OFFLINE:
1160         HRegionInfo regionInfo;
1161         if (regionState != null) {
1162           regionInfo = regionState.getRegion();
1163         } else {
1164           try {
1165             byte [] name = rt.getRegionName();
1166             Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(catalogTracker, name);
1167             regionInfo = p.getFirst();
1168           } catch (IOException e) {
1169             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1170             return;
1171           }
1172         }
1173         LOG.info("HBCK repair is triggering assignment of region=" +
1174             regionInfo.getRegionNameAsString());
1175         // trigger assign, node is already in OFFLINE so don't need to update ZK
1176         assign(regionInfo, false);
1177         break;
1178 
1179       default:
1180         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1181         break;
1182     }
1183 
1184   }
1185 
1186   // ZooKeeper events
1187 
1188   /**
1189    * New unassigned node has been created.
1190    *
1191    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1192    * creating an unassigned node.
1193    *
1194    * <p>When this happens we must:
1195    * <ol>
1196    *   <li>Watch the node for further events</li>
1197    *   <li>Read and handle the state in the node</li>
1198    * </ol>
1199    */
1200   @Override
1201   public void nodeCreated(String path) {
1202     handleAssignmentEvent(path);
1203   }
1204 
1205   /**
1206    * Existing unassigned node has had data changed.
1207    *
1208    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1209    * OPENING/OPENED and CLOSING/CLOSED.
1210    *
1211    * <p>When this happens we must:
1212    * <ol>
1213    *   <li>Watch the node for further events</li>
1214    *   <li>Read and handle the state in the node</li>
1215    * </ol>
1216    */
1217   @Override
1218   public void nodeDataChanged(String path) {
1219     handleAssignmentEvent(path);
1220   }
1221 
1222 
1223   // We  don't want to have two events on the same region managed simultaneously.
1224   // For this reason, we need to wait if an event on the same region is currently in progress.
1225   // So we track the region names of the events in progress, and we keep a waiting list.
1226   private final Set<String> regionsInProgress = new HashSet<String>();
1227   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1228   //  this as we want the events to be managed in the same order as we received them.
1229   private final LinkedHashMultimap <String, RegionRunnable>
1230       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1231 
1232   /**
1233    * A specific runnable that works only on a region.
1234    */
1235   private interface RegionRunnable extends Runnable{
1236     /**
1237      * @return - the name of the region it works on.
1238      */
1239     String getRegionName();
1240   }
1241 
1242   /**
1243    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1244    * Order is respected.
1245    */
1246   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1247 
1248     synchronized (regionsInProgress) {
1249       // If we're there is already a task with this region, we add it to the
1250       //  waiting list and return.
1251       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1252         synchronized (zkEventWorkerWaitingList){
1253           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1254         }
1255         return;
1256       }
1257 
1258       // No event in progress on this region => we can submit a new task immediately.
1259       regionsInProgress.add(regRunnable.getRegionName());
1260       zkEventWorkers.submit(new Runnable() {
1261         @Override
1262         public void run() {
1263           try {
1264             regRunnable.run();
1265           } finally {
1266             // now that we have finished, let's see if there is an event for the same region in the
1267             //  waiting list. If it's the case, we can now submit it to the pool.
1268             synchronized (regionsInProgress) {
1269               regionsInProgress.remove(regRunnable.getRegionName());
1270               synchronized (zkEventWorkerWaitingList) {
1271                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1272                     regRunnable.getRegionName());
1273                 if (!waiting.isEmpty()) {
1274                   // We want the first object only. The only way to get it is through an iterator.
1275                   RegionRunnable toSubmit = waiting.iterator().next();
1276                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1277                   zkEventWorkersSubmit(toSubmit);
1278                 }
1279               }
1280             }
1281           }
1282         }
1283       });
1284     }
1285   }
1286 
1287   @Override
1288   public void nodeDeleted(final String path) {
1289     if (path.startsWith(watcher.assignmentZNode)) {
1290       final String regionName = ZKAssign.getRegionName(watcher, path);
1291       zkEventWorkersSubmit(new RegionRunnable() {
1292         @Override
1293         public String getRegionName() {
1294           return regionName;
1295         }
1296 
1297         @Override
1298         public void run() {
1299           Lock lock = locker.acquireLock(regionName);
1300           try {
1301             RegionState rs = regionStates.getRegionTransitionState(regionName);
1302             if (rs == null) {
1303               rs = regionStates.getRegionState(regionName);
1304               if (rs == null || !rs.isMergingNew()) {
1305                 // MergingNew is an offline state
1306                 return;
1307               }
1308             }
1309 
1310             HRegionInfo regionInfo = rs.getRegion();
1311             String regionNameStr = regionInfo.getRegionNameAsString();
1312             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1313             boolean disabled = getZKTable().isDisablingOrDisabledTable(regionInfo.getTable());
1314             ServerName serverName = rs.getServerName();
1315             if (serverManager.isServerOnline(serverName)) {
1316               if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1317                 synchronized (regionStates) {
1318                   regionOnline(regionInfo, serverName);
1319                   if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1320                     // Check if the daugter regions are still there, if they are present, offline
1321                     // as its the case of a rollback.
1322                     HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1323                     HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1324                     if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1325                       LOG.warn("Split daughter region not in transition " + hri_a);
1326                     }
1327                     if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1328                       LOG.warn("Split daughter region not in transition" + hri_b);
1329                     }
1330                     regionOffline(hri_a);
1331                     regionOffline(hri_b);
1332                     splitRegions.remove(regionInfo);
1333                   }
1334                   if (disabled) {
1335                     // if server is offline, no hurt to unassign again
1336                     LOG.info("Opened " + regionNameStr
1337                         + "but this table is disabled, triggering close of region");
1338                     unassign(regionInfo);
1339                   }
1340                 }
1341               } else if (rs.isMergingNew()) {
1342                 synchronized (regionStates) {
1343                   String p = regionInfo.getEncodedName();
1344                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1345                   if (regions != null) {
1346                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1347                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1348                   }
1349                 }
1350               }
1351             }
1352           } finally {
1353             lock.unlock();
1354           }
1355         }
1356 
1357         private void onlineMergingRegion(boolean disabled,
1358             final HRegionInfo hri, final ServerName serverName) {
1359           RegionState regionState = regionStates.getRegionState(hri);
1360           if (regionState != null && regionState.isMerging()
1361               && regionState.isOnServer(serverName)) {
1362             regionOnline(regionState.getRegion(), serverName);
1363             if (disabled) {
1364               unassign(hri);
1365             }
1366           }
1367         }
1368       });
1369     }
1370   }
1371 
1372   /**
1373    * New unassigned node has been created.
1374    *
1375    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1376    * region by creating a znode.
1377    *
1378    * <p>When this happens we must:
1379    * <ol>
1380    *   <li>Watch the node for further children changed events</li>
1381    *   <li>Watch all new children for changed events</li>
1382    * </ol>
1383    */
1384   @Override
1385   public void nodeChildrenChanged(String path) {
1386     if (path.equals(watcher.assignmentZNode)) {
1387       zkEventWorkers.submit(new Runnable() {
1388         @Override
1389         public void run() {
1390           try {
1391             // Just make sure we see the changes for the new znodes
1392             List<String> children =
1393               ZKUtil.listChildrenAndWatchForNewChildren(
1394                 watcher, watcher.assignmentZNode);
1395             if (children != null) {
1396               Stat stat = new Stat();
1397               for (String child : children) {
1398                 // if region is in transition, we already have a watch
1399                 // on it, so no need to watch it again. So, as I know for now,
1400                 // this is needed to watch splitting nodes only.
1401                 if (!regionStates.isRegionInTransition(child)) {
1402                   ZKAssign.getDataAndWatch(watcher, child, stat);
1403                 }
1404               }
1405             }
1406           } catch (KeeperException e) {
1407             server.abort("Unexpected ZK exception reading unassigned children", e);
1408           }
1409         }
1410       });
1411     }
1412   }
1413 
1414   
1415   /**
1416    * Marks the region as online.  Removes it from regions in transition and
1417    * updates the in-memory assignment information.
1418    * <p>
1419    * Used when a region has been successfully opened on a region server.
1420    * @param regionInfo
1421    * @param sn
1422    */
1423   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1424     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1425   }
1426 
1427   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1428     numRegionsOpened.incrementAndGet();
1429     regionStates.regionOnline(regionInfo, sn, openSeqNum);
1430 
1431     // Remove plan if one.
1432     clearRegionPlan(regionInfo);
1433     // Add the server to serversInUpdatingTimer
1434     addToServersInUpdatingTimer(sn);
1435     balancer.regionOnline(regionInfo, sn);
1436 
1437     // Tell our listeners that a region was opened
1438     sendRegionOpenedNotification(regionInfo, sn);
1439   }
1440 
1441   /**
1442    * Pass the assignment event to a worker for processing.
1443    * Each worker is a single thread executor service.  The reason
1444    * for just one thread is to make sure all events for a given
1445    * region are processed in order.
1446    *
1447    * @param path
1448    */
1449   private void handleAssignmentEvent(final String path) {
1450     if (path.startsWith(watcher.assignmentZNode)) {
1451       final String regionName = ZKAssign.getRegionName(watcher, path);
1452 
1453       zkEventWorkersSubmit(new RegionRunnable() {
1454         @Override
1455         public String getRegionName() {
1456           return regionName;
1457         }
1458 
1459         @Override
1460         public void run() {
1461           try {
1462             Stat stat = new Stat();
1463             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1464             if (data == null) return;
1465 
1466             RegionTransition rt = RegionTransition.parseFrom(data);
1467             handleRegion(rt, stat.getVersion());
1468           } catch (KeeperException e) {
1469             server.abort("Unexpected ZK exception reading unassigned node data", e);
1470           } catch (DeserializationException e) {
1471             server.abort("Unexpected exception deserializing node data", e);
1472           }
1473         }
1474       });
1475     }
1476   }
1477 
1478   /**
1479    * Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
1480    * will update timers for this server in background
1481    * @param sn
1482    */
1483   private void addToServersInUpdatingTimer(final ServerName sn) {
1484     if (tomActivated){
1485       this.serversInUpdatingTimer.add(sn);
1486     }
1487   }
1488 
1489   /**
1490    * Touch timers for all regions in transition that have the passed
1491    * <code>sn</code> in common.
1492    * Call this method whenever a server checks in.  Doing so helps the case where
1493    * a new regionserver has joined the cluster and its been given 1k regions to
1494    * open.  If this method is tickled every time the region reports in a
1495    * successful open then the 1k-th region won't be timed out just because its
1496    * sitting behind the open of 999 other regions.  This method is NOT used
1497    * as part of bulk assign -- there we have a different mechanism for extending
1498    * the regions in transition timer (we turn it off temporarily -- because
1499    * there is no regionplan involved when bulk assigning.
1500    * @param sn
1501    */
1502   private void updateTimers(final ServerName sn) {
1503     Preconditions.checkState(tomActivated);
1504     if (sn == null) return;
1505 
1506     // This loop could be expensive.
1507     // First make a copy of current regionPlan rather than hold sync while
1508     // looping because holding sync can cause deadlock.  Its ok in this loop
1509     // if the Map we're going against is a little stale
1510     List<Map.Entry<String, RegionPlan>> rps;
1511     synchronized(this.regionPlans) {
1512       rps = new ArrayList<Map.Entry<String, RegionPlan>>(regionPlans.entrySet());
1513     }
1514 
1515     for (Map.Entry<String, RegionPlan> e : rps) {
1516       if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
1517         RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
1518         if (regionState != null) {
1519           regionState.updateTimestampToNow();
1520         }
1521       }
1522     }
1523   }
1524 
1525   /**
1526    * Marks the region as offline.  Removes it from regions in transition and
1527    * removes in-memory assignment information.
1528    * <p>
1529    * Used when a region has been closed and should remain closed.
1530    * @param regionInfo
1531    */
1532   public void regionOffline(final HRegionInfo regionInfo) {
1533     regionOffline(regionInfo, null);
1534   }
1535 
1536   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1537     if (useZKForAssignment) {
1538       // Disabling so should not be reassigned, just delete the CLOSED node
1539       LOG.debug("Table being disabled so deleting ZK node and removing from " +
1540         "regions in transition, skipping assignment of region " +
1541           regionInfo.getRegionNameAsString());
1542       String encodedName = regionInfo.getEncodedName();
1543       deleteNodeInStates(encodedName, "closed", null,
1544         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1545     }
1546     regionOffline(regionInfo);
1547   }
1548 
1549   // Assignment methods
1550 
1551   /**
1552    * Assigns the specified region.
1553    * <p>
1554    * If a RegionPlan is available with a valid destination then it will be used
1555    * to determine what server region is assigned to.  If no RegionPlan is
1556    * available, region will be assigned to a random available server.
1557    * <p>
1558    * Updates the RegionState and sends the OPEN RPC.
1559    * <p>
1560    * This will only succeed if the region is in transition and in a CLOSED or
1561    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1562    * chosen server is up and running (It may have just crashed!).  If the
1563    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1564    *
1565    * @param region server to be assigned
1566    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1567    *                       OFFLINE state before assigning the region
1568    */
1569   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1570     assign(region, setOfflineInZK, false);
1571   }
1572 
1573   /**
1574    * Use care with forceNewPlan. It could cause double assignment.
1575    */
1576   public void assign(HRegionInfo region,
1577       boolean setOfflineInZK, boolean forceNewPlan) {
1578     if (isDisabledorDisablingRegionInRIT(region)) {
1579       return;
1580     }
1581     if (this.serverManager.isClusterShutdown()) {
1582       LOG.info("Cluster shutdown is set; skipping assign of " +
1583         region.getRegionNameAsString());
1584       return;
1585     }
1586     String encodedName = region.getEncodedName();
1587     Lock lock = locker.acquireLock(encodedName);
1588     try {
1589       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1590       if (state != null) {
1591         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1592           LOG.info("Skip assigning " + region.getRegionNameAsString()
1593             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1594             + " is dead but not processed yet");
1595           return;
1596         }
1597         assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1598       }
1599     } finally {
1600       lock.unlock();
1601     }
1602   }
1603 
1604   /**
1605    * Bulk assign regions to <code>destination</code>.
1606    * @param destination
1607    * @param regions Regions to assign.
1608    * @return true if successful
1609    */
1610   boolean assign(final ServerName destination, final List<HRegionInfo> regions) {
1611     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1612     try {
1613       int regionCount = regions.size();
1614       if (regionCount == 0) {
1615         return true;
1616       }
1617       LOG.debug("Assigning " + regionCount + " region(s) to " + destination.toString());
1618       Set<String> encodedNames = new HashSet<String>(regionCount);
1619       for (HRegionInfo region : regions) {
1620         encodedNames.add(region.getEncodedName());
1621       }
1622 
1623       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1624       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1625       try {
1626         AtomicInteger counter = new AtomicInteger(0);
1627         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1628         OfflineCallback cb = new OfflineCallback(
1629           watcher, destination, counter, offlineNodesVersions);
1630         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1631         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1632         for (HRegionInfo region : regions) {
1633           String encodedName = region.getEncodedName();
1634           if (!isDisabledorDisablingRegionInRIT(region)) {
1635             RegionState state = forceRegionStateToOffline(region, false);
1636             boolean onDeadServer = false;
1637             if (state != null) {
1638               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1639                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1640                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1641                   + " is dead but not processed yet");
1642                 onDeadServer = true;
1643               } else if (!useZKForAssignment
1644                   || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1645                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1646                 plans.put(encodedName, plan);
1647                 states.add(state);
1648                 continue;
1649               }
1650             }
1651             // Reassign if the region wasn't on a dead server
1652             if (!onDeadServer) {
1653               LOG.info("failed to force region state to offline or "
1654                 + "failed to set it offline in ZK, will reassign later: " + region);
1655               failedToOpenRegions.add(region); // assign individually later
1656             }
1657           }
1658           // Release the lock, this region is excluded from bulk assign because
1659           // we can't update its state, or set its znode to offline.
1660           Lock lock = locks.remove(encodedName);
1661           lock.unlock();
1662         }
1663         if (useZKForAssignment) {
1664           // Wait until all unassigned nodes have been put up and watchers set.
1665           int total = states.size();
1666           for (int oldCounter = 0; !server.isStopped();) {
1667             int count = counter.get();
1668             if (oldCounter != count) {
1669               LOG.info(destination.toString() + " unassigned znodes=" + count + " of total="
1670                   + total);
1671               oldCounter = count;
1672             }
1673             if (count >= total) break;
1674             Threads.sleep(5);
1675           }
1676         }
1677 
1678         if (server.isStopped()) {
1679           return false;
1680         }
1681 
1682         // Add region plans, so we can updateTimers when one region is opened so
1683         // that unnecessary timeout on RIT is reduced.
1684         this.addPlans(plans);
1685 
1686         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1687           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1688         for (RegionState state: states) {
1689           HRegionInfo region = state.getRegion();
1690           String encodedRegionName = region.getEncodedName();
1691           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1692           if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1693             LOG.warn("failed to offline in zookeeper: " + region);
1694             failedToOpenRegions.add(region); // assign individually later
1695             Lock lock = locks.remove(encodedRegionName);
1696             lock.unlock();
1697           } else {
1698             regionStates.updateRegionState(
1699               region, State.PENDING_OPEN, destination);
1700             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1701             if (this.shouldAssignRegionsWithFavoredNodes) {
1702               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1703             }
1704             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1705               region, nodeVersion, favoredNodes));
1706           }
1707         }
1708 
1709         // Move on to open regions.
1710         try {
1711           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1712           // regions will be assigned individually.
1713           long maxWaitTime = System.currentTimeMillis() +
1714             this.server.getConfiguration().
1715               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1716           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1717             try {
1718               // regionOpenInfos is empty if all regions are in failedToOpenRegions list
1719               if (regionOpenInfos.isEmpty()) {
1720                 break;
1721               }
1722               List<RegionOpeningState> regionOpeningStateList = serverManager
1723                 .sendRegionOpen(destination, regionOpenInfos);
1724               if (regionOpeningStateList == null) {
1725                 // Failed getting RPC connection to this server
1726                 return false;
1727               }
1728               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1729                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1730                 if (openingState != RegionOpeningState.OPENED) {
1731                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1732                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1733                     processAlreadyOpenedRegion(region, destination);
1734                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1735                     // Failed opening this region, reassign it later
1736                     failedToOpenRegions.add(region);
1737                   } else {
1738                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1739                       + openingState + " in assigning region " + region);
1740                   }
1741                 }
1742               }
1743               break;
1744             } catch (IOException e) {
1745               if (e instanceof RemoteException) {
1746                 e = ((RemoteException)e).unwrapRemoteException();
1747               }
1748               if (e instanceof RegionServerStoppedException) {
1749                 LOG.warn("The region server was shut down, ", e);
1750                 // No need to retry, the region server is a goner.
1751                 return false;
1752               } else if (e instanceof ServerNotRunningYetException) {
1753                 long now = System.currentTimeMillis();
1754                 if (now < maxWaitTime) {
1755                   LOG.debug("Server is not yet up; waiting up to " +
1756                     (maxWaitTime - now) + "ms", e);
1757                   Thread.sleep(100);
1758                   i--; // reset the try count
1759                   continue;
1760                 }
1761               } else if (e instanceof java.net.SocketTimeoutException
1762                   && this.serverManager.isServerOnline(destination)) {
1763                 // In case socket is timed out and the region server is still online,
1764                 // the openRegion RPC could have been accepted by the server and
1765                 // just the response didn't go through.  So we will retry to
1766                 // open the region on the same server.
1767                 if (LOG.isDebugEnabled()) {
1768                   LOG.debug("Bulk assigner openRegion() to " + destination
1769                     + " has timed out, but the regions might"
1770                     + " already be opened on it.", e);
1771                 }
1772                 // wait and reset the re-try count, server might be just busy.
1773                 Thread.sleep(100);
1774                 i--;
1775                 continue;
1776               }
1777               throw e;
1778             }
1779           }
1780         } catch (IOException e) {
1781           // Can be a socket timeout, EOF, NoRouteToHost, etc
1782           LOG.info("Unable to communicate with " + destination
1783             + " in order to assign regions, ", e);
1784           return false;
1785         } catch (InterruptedException e) {
1786           throw new RuntimeException(e);
1787         }
1788       } finally {
1789         for (Lock lock : locks.values()) {
1790           lock.unlock();
1791         }
1792       }
1793 
1794       if (!failedToOpenRegions.isEmpty()) {
1795         for (HRegionInfo region : failedToOpenRegions) {
1796           if (!regionStates.isRegionOnline(region)) {
1797             invokeAssign(region);
1798           }
1799         }
1800       }
1801       LOG.debug("Bulk assigning done for " + destination);
1802       return true;
1803     } finally {
1804       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
1805     }
1806   }
1807 
1808   /**
1809    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1810    *
1811    * The RPC will be sent only to the region sever found in the region state
1812    * if it is passed in, otherwise, to the src server specified. If region
1813    * state is not specified, we don't update region state at all, instead
1814    * we just send the RPC call. This is useful for some cleanup without
1815    * messing around the region states (see handleRegion, on region opened
1816    * on an unexpected server scenario, for an example)
1817    */
1818   private void unassign(final HRegionInfo region,
1819       final RegionState state, final int versionOfClosingNode,
1820       final ServerName dest, final boolean transitionInZK,
1821       final ServerName src) {
1822     ServerName server = src;
1823     if (state != null) {
1824       server = state.getServerName();
1825     }
1826     long maxWaitTime = -1;
1827     for (int i = 1; i <= this.maximumAttempts; i++) {
1828       if (this.server.isStopped() || this.server.isAborted()) {
1829         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1830         return;
1831       }
1832       // ClosedRegionhandler can remove the server from this.regions
1833       if (!serverManager.isServerOnline(server)) {
1834         LOG.debug("Offline " + region.getRegionNameAsString()
1835           + ", no need to unassign since it's on a dead server: " + server);
1836         if (transitionInZK) {
1837           // delete the node. if no node exists need not bother.
1838           deleteClosingOrClosedNode(region, server);
1839         }
1840         if (state != null) {
1841           regionOffline(region);
1842         }
1843         return;
1844       }
1845       try {
1846         // Send CLOSE RPC
1847         if (serverManager.sendRegionClose(server, region,
1848           versionOfClosingNode, dest, transitionInZK)) {
1849           LOG.debug("Sent CLOSE to " + server + " for region " +
1850             region.getRegionNameAsString());
1851           if (useZKForAssignment && !transitionInZK && state != null) {
1852             // Retry to make sure the region is
1853             // closed so as to avoid double assignment.
1854             unassign(region, state, versionOfClosingNode,
1855               dest, transitionInZK, src);
1856           }
1857           return;
1858         }
1859         // This never happens. Currently regionserver close always return true.
1860         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1861         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1862           region.getRegionNameAsString());
1863       } catch (Throwable t) {
1864         if (t instanceof RemoteException) {
1865           t = ((RemoteException)t).unwrapRemoteException();
1866         }
1867         boolean logRetries = true;
1868         if (t instanceof NotServingRegionException
1869             || t instanceof RegionServerStoppedException
1870             || t instanceof ServerNotRunningYetException) {
1871           LOG.debug("Offline " + region.getRegionNameAsString()
1872             + ", it's not any more on " + server, t);
1873           if (transitionInZK) {
1874             deleteClosingOrClosedNode(region, server);
1875           }
1876           if (state != null) {
1877             regionOffline(region);
1878           }
1879           return;
1880         } else if ((t instanceof FailedServerException) || (state != null &&
1881             t instanceof RegionAlreadyInTransitionException)) {
1882           long sleepTime = 0;
1883           Configuration conf = this.server.getConfiguration();
1884           if(t instanceof FailedServerException) {
1885             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1886                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1887           } else {
1888             // RS is already processing this region, only need to update the timestamp
1889             LOG.debug("update " + state + " the timestamp.");
1890             state.updateTimestampToNow();
1891             if (maxWaitTime < 0) {
1892               maxWaitTime =
1893                   EnvironmentEdgeManager.currentTimeMillis()
1894                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1895                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1896             }
1897             long now = EnvironmentEdgeManager.currentTimeMillis();
1898             if (now < maxWaitTime) {
1899               LOG.debug("Region is already in transition; "
1900                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1901               sleepTime = 100;
1902               i--; // reset the try count
1903               logRetries = false;
1904             }
1905           }
1906           try {
1907             if (sleepTime > 0) {
1908               Thread.sleep(sleepTime);
1909             }
1910           } catch (InterruptedException ie) {
1911             LOG.warn("Failed to unassign "
1912               + region.getRegionNameAsString() + " since interrupted", ie);
1913             Thread.currentThread().interrupt();
1914             if (!tomActivated && state != null) {
1915               regionStates.updateRegionState(region, State.FAILED_CLOSE);
1916             }
1917             return;
1918           }
1919         }
1920 
1921         if (logRetries) {
1922           LOG.info("Server " + server + " returned " + t + " for "
1923             + region.getRegionNameAsString() + ", try=" + i
1924             + " of " + this.maximumAttempts, t);
1925           // Presume retry or server will expire.
1926         }
1927       }
1928     }
1929     // Run out of attempts
1930     if (!tomActivated && state != null) {
1931       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1932     }
1933   }
1934 
1935   /**
1936    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1937    */
1938   private RegionState forceRegionStateToOffline(
1939       final HRegionInfo region, final boolean forceNewPlan) {
1940     RegionState state = regionStates.getRegionState(region);
1941     if (state == null) {
1942       LOG.warn("Assigning a region not in region states: " + region);
1943       state = regionStates.createRegionState(region);
1944     }
1945 
1946     ServerName sn = state.getServerName();
1947     if (forceNewPlan && LOG.isDebugEnabled()) {
1948       LOG.debug("Force region state offline " + state);
1949     }
1950 
1951     switch (state.getState()) {
1952     case OPEN:
1953     case OPENING:
1954     case PENDING_OPEN:
1955     case CLOSING:
1956     case PENDING_CLOSE:
1957       if (!forceNewPlan) {
1958         LOG.debug("Skip assigning " +
1959           region + ", it is already " + state);
1960         return null;
1961       }
1962     case FAILED_CLOSE:
1963     case FAILED_OPEN:
1964       unassign(region, state, -1, null, false, null);
1965       state = regionStates.getRegionState(region);
1966       if (state.isFailedClose()) {
1967         // If we can't close the region, we can't re-assign
1968         // it so as to avoid possible double assignment/data loss.
1969         LOG.info("Skip assigning " +
1970           region + ", we couldn't close it: " + state);
1971         return null;
1972       }
1973     case OFFLINE:
1974       // This region could have been open on this server
1975       // for a while. If the server is dead and not processed
1976       // yet, we can move on only if the meta shows the
1977       // region is not on this server actually, or on a server
1978       // not dead, or dead and processed already.
1979       // In case not using ZK, we don't need this check because
1980       // we have the latest info in memory, and the caller
1981       // will do another round checking any way.
1982       if (useZKForAssignment
1983           && regionStates.isServerDeadAndNotProcessed(sn)
1984           && wasRegionOnDeadServerByMeta(region, sn)) {
1985         if (!regionStates.isRegionInTransition(region)) {
1986           LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
1987           regionStates.updateRegionState(region, State.OFFLINE);
1988         }
1989         LOG.info("Skip assigning " + region.getRegionNameAsString()
1990             + ", it is on a dead but not processed yet server: " + sn);
1991         return null;
1992       }
1993     case CLOSED:
1994       break;
1995     default:
1996       LOG.error("Trying to assign region " + region
1997         + ", which is " + state);
1998       return null;
1999     }
2000     return state;
2001   }
2002 
2003   private boolean wasRegionOnDeadServerByMeta(
2004       final HRegionInfo region, final ServerName sn) {
2005     try {
2006       if (region.isMetaRegion()) {
2007         ServerName server = catalogTracker.getMetaLocation();
2008         return regionStates.isServerDeadAndNotProcessed(server);
2009       }
2010       while (!server.isStopped()) {
2011         try {
2012           catalogTracker.waitForMeta();
2013           Result r = MetaReader.getRegionResult(catalogTracker, region.getRegionName());
2014           if (r == null || r.isEmpty()) return false;
2015           ServerName server = HRegionInfo.getServerName(r);
2016           return regionStates.isServerDeadAndNotProcessed(server);
2017         } catch (IOException ioe) {
2018           LOG.info("Received exception accessing hbase:meta during force assign "
2019             + region.getRegionNameAsString() + ", retrying", ioe);
2020         }
2021       }
2022     } catch (InterruptedException e) {
2023       Thread.currentThread().interrupt();
2024       LOG.info("Interrupted accessing hbase:meta", e);
2025     }
2026     // Call is interrupted or server is stopped.
2027     return regionStates.isServerDeadAndNotProcessed(sn);
2028   }
2029 
2030   /**
2031    * Caller must hold lock on the passed <code>state</code> object.
2032    * @param state
2033    * @param setOfflineInZK
2034    * @param forceNewPlan
2035    */
2036   private void assign(RegionState state,
2037       final boolean setOfflineInZK, final boolean forceNewPlan) {
2038     long startTime = EnvironmentEdgeManager.currentTimeMillis();
2039     try {
2040       Configuration conf = server.getConfiguration();
2041       RegionState currentState = state;
2042       int versionOfOfflineNode = -1;
2043       RegionPlan plan = null;
2044       long maxWaitTime = -1;
2045       HRegionInfo region = state.getRegion();
2046       RegionOpeningState regionOpenState;
2047       Throwable previousException = null;
2048       for (int i = 1; i <= maximumAttempts; i++) {
2049         if (server.isStopped() || server.isAborted()) {
2050           LOG.info("Skip assigning " + region.getRegionNameAsString()
2051             + ", the server is stopped/aborted");
2052           return;
2053         }
2054 
2055         if (plan == null) { // Get a server for the region at first
2056           try {
2057             plan = getRegionPlan(region, forceNewPlan);
2058           } catch (HBaseIOException e) {
2059             LOG.warn("Failed to get region plan", e);
2060           }
2061         }
2062 
2063         if (plan == null) {
2064           LOG.warn("Unable to determine a plan to assign " + region);
2065           if (tomActivated){
2066             this.timeoutMonitor.setAllRegionServersOffline(true);
2067           } else {
2068             if (region.isMetaRegion()) {
2069                 if (i == maximumAttempts) {
2070                   i = 0; // re-set attempt count to 0 for at least 1 retry
2071 
2072                   LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2073                     " after maximumAttempts (" + this.maximumAttempts +
2074                     "). Reset attempts count and continue retrying.");
2075                 }
2076                 waitForRetryingMetaAssignment();
2077                 continue;
2078             }
2079 
2080             regionStates.updateRegionState(region, State.FAILED_OPEN);
2081           }
2082           return;
2083         }
2084         if (setOfflineInZK && versionOfOfflineNode == -1) {
2085           // get the version of the znode after setting it to OFFLINE.
2086           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
2087           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2088           if (versionOfOfflineNode != -1) {
2089             if (isDisabledorDisablingRegionInRIT(region)) {
2090               return;
2091             }
2092             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
2093             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
2094             // try to set to ENABLED directly then client API may think table is enabled.
2095             // When we have a case such as all the regions are added directly into hbase:meta and we call
2096             // assignRegion then we need to make the table ENABLED. Hence in such case the table
2097             // will not be in ENABLING or ENABLED state.
2098             TableName tableName = region.getTable();
2099             if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
2100               LOG.debug("Setting table " + tableName + " to ENABLED state.");
2101               setEnabledTable(tableName);
2102             }
2103           }
2104         }
2105         if (setOfflineInZK && versionOfOfflineNode == -1) {
2106           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2107           // Setting offline in ZK must have been failed due to ZK racing or some
2108           // exception which may make the server to abort. If it is ZK racing,
2109           // we should retry since we already reset the region state,
2110           // existing (re)assignment will fail anyway.
2111           if (!server.isAborted()) {
2112             continue;
2113           }
2114         }
2115         LOG.info("Assigning " + region.getRegionNameAsString() +
2116             " to " + plan.getDestination().toString());
2117         // Transition RegionState to PENDING_OPEN
2118         currentState = regionStates.updateRegionState(region,
2119           State.PENDING_OPEN, plan.getDestination());
2120 
2121         boolean needNewPlan;
2122         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2123             " to " + plan.getDestination();
2124         try {
2125           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2126           if (this.shouldAssignRegionsWithFavoredNodes) {
2127             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2128           }
2129           regionOpenState = serverManager.sendRegionOpen(
2130               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2131 
2132           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2133             // Failed opening this region, looping again on a new server.
2134             needNewPlan = true;
2135             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2136                 " trying to assign elsewhere instead; " +
2137                 "try=" + i + " of " + this.maximumAttempts);
2138           } else {
2139             // we're done
2140             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2141               processAlreadyOpenedRegion(region, plan.getDestination());
2142             }
2143             return;
2144           }
2145 
2146         } catch (Throwable t) {
2147           if (t instanceof RemoteException) {
2148             t = ((RemoteException) t).unwrapRemoteException();
2149           }
2150           previousException = t;
2151 
2152           // Should we wait a little before retrying? If the server is starting it's yes.
2153           // If the region is already in transition, it's yes as well: we want to be sure that
2154           //  the region will get opened but we don't want a double assignment.
2155           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2156               t instanceof ServerNotRunningYetException);
2157 
2158           // In case socket is timed out and the region server is still online,
2159           // the openRegion RPC could have been accepted by the server and
2160           // just the response didn't go through.  So we will retry to
2161           // open the region on the same server to avoid possible
2162           // double assignment.
2163           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2164               && this.serverManager.isServerOnline(plan.getDestination()));
2165 
2166           if (hold) {
2167             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2168               "try=" + i + " of " + this.maximumAttempts, t);
2169 
2170             if (maxWaitTime < 0) {
2171               if (t instanceof RegionAlreadyInTransitionException) {
2172                 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
2173                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2174                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2175               } else {
2176                 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
2177                   + this.server.getConfiguration().getLong(
2178                     "hbase.regionserver.rpc.startup.waittime", 60000);
2179               }
2180             }
2181             try {
2182               needNewPlan = false;
2183               long now = EnvironmentEdgeManager.currentTimeMillis();
2184               if (now < maxWaitTime) {
2185                 LOG.debug("Server is not yet up or region is already in transition; "
2186                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2187                 Thread.sleep(100);
2188                 i--; // reset the try count
2189               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2190                 LOG.debug("Server is not up for a while; try a new one", t);
2191                 needNewPlan = true;
2192               }
2193             } catch (InterruptedException ie) {
2194               LOG.warn("Failed to assign "
2195                   + region.getRegionNameAsString() + " since interrupted", ie);
2196               Thread.currentThread().interrupt();
2197               if (!tomActivated) {
2198                 regionStates.updateRegionState(region, State.FAILED_OPEN);
2199               }
2200               return;
2201             }
2202           } else if (retry) {
2203             needNewPlan = false;
2204             i--; // we want to retry as many times as needed as long as the RS is not dead.
2205             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2206           } else {
2207             needNewPlan = true;
2208             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2209                 " try=" + i + " of " + this.maximumAttempts, t);
2210           }
2211         }
2212 
2213         if (i == this.maximumAttempts) {
2214           // For meta region, we have to keep retrying until succeeding
2215           if (region.isMetaRegion()) {
2216             i = 0; // re-set attempt count to 0 for at least 1 retry
2217             LOG.warn(assignMsg +
2218                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2219                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
2220             waitForRetryingMetaAssignment();
2221           }
2222           else {
2223             // Don't reset the region state or get a new plan any more.
2224             // This is the last try.
2225             continue;
2226           }
2227         }
2228 
2229         // If region opened on destination of present plan, reassigning to new
2230         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2231         // reassigning to same RS.
2232         if (needNewPlan) {
2233           // Force a new plan and reassign. Will return null if no servers.
2234           // The new plan could be the same as the existing plan since we don't
2235           // exclude the server of the original plan, which should not be
2236           // excluded since it could be the only server up now.
2237           RegionPlan newPlan = null;
2238           try {
2239             newPlan = getRegionPlan(region, true);
2240           } catch (HBaseIOException e) {
2241             LOG.warn("Failed to get region plan", e);
2242           }
2243           if (newPlan == null) {
2244             if (tomActivated) {
2245               this.timeoutMonitor.setAllRegionServersOffline(true);
2246             } else {
2247               regionStates.updateRegionState(region, State.FAILED_OPEN);
2248             }
2249             LOG.warn("Unable to find a viable location to assign region " +
2250                 region.getRegionNameAsString());
2251             return;
2252           }
2253 
2254           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2255             // Clean out plan we failed execute and one that doesn't look like it'll
2256             // succeed anyways; we need a new plan!
2257             // Transition back to OFFLINE
2258             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2259             versionOfOfflineNode = -1;
2260             plan = newPlan;
2261           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2262               previousException instanceof FailedServerException) {
2263             try {
2264               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2265                 " to the same failed server.");
2266               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2267                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2268             } catch (InterruptedException ie) {
2269               LOG.warn("Failed to assign "
2270                   + region.getRegionNameAsString() + " since interrupted", ie);
2271               Thread.currentThread().interrupt();
2272               if (!tomActivated) {
2273                 regionStates.updateRegionState(region, State.FAILED_OPEN);
2274               }
2275               return;
2276             }
2277           }
2278         }
2279       }
2280       // Run out of attempts
2281       if (!tomActivated) {
2282         regionStates.updateRegionState(region, State.FAILED_OPEN);
2283       }
2284     } finally {
2285       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
2286     }
2287   }
2288 
2289   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2290     // Remove region from in-memory transition and unassigned node from ZK
2291     // While trying to enable the table the regions of the table were
2292     // already enabled.
2293     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2294       + " to " + sn);
2295     String encodedName = region.getEncodedName();
2296     deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2297     regionStates.regionOnline(region, sn);
2298   }
2299 
2300   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2301     TableName tableName = region.getTable();
2302     boolean disabled = this.zkTable.isDisabledTable(tableName);
2303     if (disabled || this.zkTable.isDisablingTable(tableName)) {
2304       LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
2305         " skipping assign of " + region.getRegionNameAsString());
2306       offlineDisabledRegion(region);
2307       return true;
2308     }
2309     return false;
2310   }
2311 
2312   /**
2313    * Wait for some time before retrying meta table region assignment
2314    */
2315   private void waitForRetryingMetaAssignment() {
2316     try {
2317       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2318     } catch (InterruptedException e) {
2319       LOG.error("Got exception while waiting for hbase:meta assignment");
2320       Thread.currentThread().interrupt();
2321     }
2322   }
2323 
2324   /**
2325    * Set region as OFFLINED up in zookeeper
2326    *
2327    * @param state
2328    * @return the version of the offline node if setting of the OFFLINE node was
2329    *         successful, -1 otherwise.
2330    */
2331   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2332     if (!state.isClosed() && !state.isOffline()) {
2333       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2334       this.server.abort(msg, new IllegalStateException(msg));
2335       return -1;
2336     }
2337     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2338     int versionOfOfflineNode;
2339     try {
2340       // get the version after setting the znode to OFFLINE
2341       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2342         state.getRegion(), destination);
2343       if (versionOfOfflineNode == -1) {
2344         LOG.warn("Attempted to create/force node into OFFLINE state before "
2345             + "completing assignment but failed to do so for " + state);
2346         return -1;
2347       }
2348     } catch (KeeperException e) {
2349       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2350       return -1;
2351     }
2352     return versionOfOfflineNode;
2353   }
2354 
2355   /**
2356    * @param region the region to assign
2357    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2358    * if no servers to assign, it returns null).
2359    */
2360   private RegionPlan getRegionPlan(final HRegionInfo region,
2361       final boolean forceNewPlan)  throws HBaseIOException {
2362     return getRegionPlan(region, null, forceNewPlan);
2363   }
2364 
2365   /**
2366    * @param region the region to assign
2367    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2368    * all servers are thought to be assignable.
2369    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2370    * will be generated.
2371    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2372    * if no servers to assign, it returns null).
2373    */
2374   private RegionPlan getRegionPlan(final HRegionInfo region,
2375       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2376     // Pickup existing plan or make a new one
2377     final String encodedName = region.getEncodedName();
2378     final List<ServerName> destServers =
2379       serverManager.createDestinationServersList(serverToExclude);
2380 
2381     if (destServers.isEmpty()){
2382       LOG.warn("Can't move " + encodedName +
2383         ", there is no destination server available.");
2384       return null;
2385     }
2386 
2387     RegionPlan randomPlan = null;
2388     boolean newPlan = false;
2389     RegionPlan existingPlan;
2390 
2391     synchronized (this.regionPlans) {
2392       existingPlan = this.regionPlans.get(encodedName);
2393 
2394       if (existingPlan != null && existingPlan.getDestination() != null) {
2395         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2396           + " destination server is " + existingPlan.getDestination() +
2397             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2398       }
2399 
2400       if (forceNewPlan
2401           || existingPlan == null
2402           || existingPlan.getDestination() == null
2403           || !destServers.contains(existingPlan.getDestination())) {
2404         newPlan = true;
2405         randomPlan = new RegionPlan(region, null,
2406             balancer.randomAssignment(region, destServers));
2407         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2408           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2409           regions.add(region);
2410           try {
2411             processFavoredNodes(regions);
2412           } catch (IOException ie) {
2413             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2414           }
2415         }
2416         this.regionPlans.put(encodedName, randomPlan);
2417       }
2418     }
2419 
2420     if (newPlan) {
2421       if (randomPlan.getDestination() == null) {
2422         LOG.warn("Can't find a destination for " + encodedName);
2423         return null;
2424       }
2425       LOG.debug("No previous transition plan found (or ignoring " +
2426         "an existing plan) for " + region.getRegionNameAsString() +
2427         "; generated random plan=" + randomPlan + "; " +
2428         serverManager.countOfRegionServers() +
2429                " (online=" + serverManager.getOnlineServers().size() +
2430                ", available=" + destServers.size() + ") available servers" +
2431                ", forceNewPlan=" + forceNewPlan);
2432         return randomPlan;
2433       }
2434     LOG.debug("Using pre-existing plan for " +
2435       region.getRegionNameAsString() + "; plan=" + existingPlan);
2436     return existingPlan;
2437   }
2438 
2439   /**
2440    * Unassigns the specified region.
2441    * <p>
2442    * Updates the RegionState and sends the CLOSE RPC unless region is being
2443    * split by regionserver; then the unassign fails (silently) because we
2444    * presume the region being unassigned no longer exists (its been split out
2445    * of existence). TODO: What to do if split fails and is rolled back and
2446    * parent is revivified?
2447    * <p>
2448    * If a RegionPlan is already set, it will remain.
2449    *
2450    * @param region server to be unassigned
2451    */
2452   public void unassign(HRegionInfo region) {
2453     unassign(region, false);
2454   }
2455 
2456 
2457   /**
2458    * Unassigns the specified region.
2459    * <p>
2460    * Updates the RegionState and sends the CLOSE RPC unless region is being
2461    * split by regionserver; then the unassign fails (silently) because we
2462    * presume the region being unassigned no longer exists (its been split out
2463    * of existence). TODO: What to do if split fails and is rolled back and
2464    * parent is revivified?
2465    * <p>
2466    * If a RegionPlan is already set, it will remain.
2467    *
2468    * @param region server to be unassigned
2469    * @param force if region should be closed even if already closing
2470    */
2471   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2472     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2473     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2474       + " (offlining), current state: " + regionStates.getRegionState(region));
2475 
2476     String encodedName = region.getEncodedName();
2477     // Grab the state of this region and synchronize on it
2478     int versionOfClosingNode = -1;
2479     // We need a lock here as we're going to do a put later and we don't want multiple states
2480     //  creation
2481     ReentrantLock lock = locker.acquireLock(encodedName);
2482     RegionState state = regionStates.getRegionTransitionState(encodedName);
2483     boolean reassign = true;
2484     try {
2485       if (state == null) {
2486         // Region is not in transition.
2487         // We can unassign it only if it's not SPLIT/MERGED.
2488         state = regionStates.getRegionState(encodedName);
2489         if (state != null && state.isUnassignable()) {
2490           LOG.info("Attempting to unassign " + state + ", ignored");
2491           // Offline region will be reassigned below
2492           return;
2493         }
2494         // Create the znode in CLOSING state
2495         try {
2496           if (state == null || state.getServerName() == null) {
2497             // We don't know where the region is, offline it.
2498             // No need to send CLOSE RPC
2499             LOG.warn("Attempting to unassign a region not in RegionStates"
2500               + region.getRegionNameAsString() + ", offlined");
2501             regionOffline(region);
2502             return;
2503           }
2504           if (useZKForAssignment) {
2505             versionOfClosingNode = ZKAssign.createNodeClosing(
2506               watcher, region, state.getServerName());
2507             if (versionOfClosingNode == -1) {
2508               LOG.info("Attempting to unassign " +
2509                 region.getRegionNameAsString() + " but ZK closing node "
2510                 + "can't be created.");
2511               reassign = false; // not unassigned at all
2512               return;
2513             }
2514           }
2515         } catch (KeeperException e) {
2516           if (e instanceof NodeExistsException) {
2517             // Handle race between master initiated close and regionserver
2518             // orchestrated splitting. See if existing node is in a
2519             // SPLITTING or SPLIT state.  If so, the regionserver started
2520             // an op on node before we could get our CLOSING in.  Deal.
2521             NodeExistsException nee = (NodeExistsException)e;
2522             String path = nee.getPath();
2523             try {
2524               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2525                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2526                   "skipping unassign because region no longer exists -- its split or merge");
2527                 reassign = false; // no need to reassign for split/merged region
2528                 return;
2529               }
2530             } catch (KeeperException.NoNodeException ke) {
2531               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2532                 "; presuming split and that the region to unassign, " +
2533                 encodedName + ", no longer exists -- confirm", ke);
2534               return;
2535             } catch (KeeperException ke) {
2536               LOG.error("Unexpected zk state", ke);
2537             } catch (DeserializationException de) {
2538               LOG.error("Failed parse", de);
2539             }
2540           }
2541           // If we get here, don't understand whats going on -- abort.
2542           server.abort("Unexpected ZK exception creating node CLOSING", e);
2543           reassign = false; // heading out already
2544           return;
2545         }
2546         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2547       } else if (state.isFailedOpen()) {
2548         // The region is not open yet
2549         regionOffline(region);
2550         return;
2551       } else if (force && state.isPendingCloseOrClosing()) {
2552         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2553           " which is already " + state.getState()  +
2554           " but forcing to send a CLOSE RPC again ");
2555         if (state.isFailedClose()) {
2556           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2557         }
2558         state.updateTimestampToNow();
2559       } else {
2560         LOG.debug("Attempting to unassign " +
2561           region.getRegionNameAsString() + " but it is " +
2562           "already in transition (" + state.getState() + ", force=" + force + ")");
2563         return;
2564       }
2565 
2566       unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2567     } finally {
2568       lock.unlock();
2569 
2570       // Region is expected to be reassigned afterwards
2571       if (reassign && regionStates.isRegionOffline(region)) {
2572         assign(region, true);
2573       }
2574     }
2575   }
2576 
2577   public void unassign(HRegionInfo region, boolean force){
2578      unassign(region, force, null);
2579   }
2580 
2581   /**
2582    * @param region regioninfo of znode to be deleted.
2583    */
2584   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2585     String encodedName = region.getEncodedName();
2586     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2587       EventType.RS_ZK_REGION_CLOSED);
2588   }
2589 
2590   /**
2591    * @param path
2592    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2593    * @throws KeeperException Can happen if the znode went away in meantime.
2594    * @throws DeserializationException
2595    */
2596   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2597       throws KeeperException, DeserializationException {
2598     boolean result = false;
2599     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2600     // cleaned up before we can get data from it.
2601     byte [] data = ZKAssign.getData(watcher, path);
2602     if (data == null) {
2603       LOG.info("Node " + path + " is gone");
2604       return false;
2605     }
2606     RegionTransition rt = RegionTransition.parseFrom(data);
2607     switch (rt.getEventType()) {
2608     case RS_ZK_REQUEST_REGION_SPLIT:
2609     case RS_ZK_REGION_SPLIT:
2610     case RS_ZK_REGION_SPLITTING:
2611     case RS_ZK_REQUEST_REGION_MERGE:
2612     case RS_ZK_REGION_MERGED:
2613     case RS_ZK_REGION_MERGING:
2614       result = true;
2615       break;
2616     default:
2617       LOG.info("Node " + path + " is in " + rt.getEventType());
2618       break;
2619     }
2620     return result;
2621   }
2622 
2623   /**
2624    * Used by unit tests. Return the number of regions opened so far in the life
2625    * of the master. Increases by one every time the master opens a region
2626    * @return the counter value of the number of regions opened so far
2627    */
2628   public int getNumRegionsOpened() {
2629     return numRegionsOpened.get();
2630   }
2631 
2632   /**
2633    * Waits until the specified region has completed assignment.
2634    * <p>
2635    * If the region is already assigned, returns immediately.  Otherwise, method
2636    * blocks until the region is assigned.
2637    * @param regionInfo region to wait on assignment for
2638    * @throws InterruptedException
2639    */
2640   public boolean waitForAssignment(HRegionInfo regionInfo)
2641       throws InterruptedException {
2642     while (!regionStates.isRegionOnline(regionInfo)) {
2643       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
2644           || this.server.isStopped()) {
2645         return false;
2646       }
2647 
2648       // We should receive a notification, but it's
2649       //  better to have a timeout to recheck the condition here:
2650       //  it lowers the impact of a race condition if any
2651       regionStates.waitForUpdate(100);
2652     }
2653     return true;
2654   }
2655 
2656   /**
2657    * Assigns the hbase:meta region.
2658    * <p>
2659    * Assumes that hbase:meta is currently closed and is not being actively served by
2660    * any RegionServer.
2661    * <p>
2662    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2663    * hbase:meta to a random RegionServer.
2664    * @throws KeeperException
2665    */
2666   public void assignMeta() throws KeeperException {
2667     MetaRegionTracker.deleteMetaLocation(this.watcher);
2668     assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2669   }
2670 
2671   /**
2672    * Assigns specified regions retaining assignments, if any.
2673    * <p>
2674    * This is a synchronous call and will return once every region has been
2675    * assigned.  If anything fails, an exception is thrown
2676    * @throws InterruptedException
2677    * @throws IOException
2678    */
2679   public void assign(Map<HRegionInfo, ServerName> regions)
2680         throws IOException, InterruptedException {
2681     if (regions == null || regions.isEmpty()) {
2682       return;
2683     }
2684     List<ServerName> servers = serverManager.createDestinationServersList();
2685     if (servers == null || servers.isEmpty()) {
2686       throw new IOException("Found no destination server to assign region(s)");
2687     }
2688 
2689     // Reuse existing assignment info
2690     Map<ServerName, List<HRegionInfo>> bulkPlan =
2691       balancer.retainAssignment(regions, servers);
2692 
2693     assign(regions.size(), servers.size(),
2694       "retainAssignment=true", bulkPlan);
2695   }
2696 
2697   /**
2698    * Assigns specified regions round robin, if any.
2699    * <p>
2700    * This is a synchronous call and will return once every region has been
2701    * assigned.  If anything fails, an exception is thrown
2702    * @throws InterruptedException
2703    * @throws IOException
2704    */
2705   public void assign(List<HRegionInfo> regions)
2706         throws IOException, InterruptedException {
2707     if (regions == null || regions.isEmpty()) {
2708       return;
2709     }
2710 
2711     List<ServerName> servers = serverManager.createDestinationServersList();
2712     if (servers == null || servers.isEmpty()) {
2713       throw new IOException("Found no destination server to assign region(s)");
2714     }
2715 
2716     // Generate a round-robin bulk assignment plan
2717     Map<ServerName, List<HRegionInfo>> bulkPlan
2718       = balancer.roundRobinAssignment(regions, servers);
2719     processFavoredNodes(regions);
2720 
2721     assign(regions.size(), servers.size(),
2722       "round-robin=true", bulkPlan);
2723   }
2724 
2725   private void assign(int regions, int totalServers,
2726       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2727           throws InterruptedException, IOException {
2728 
2729     int servers = bulkPlan.size();
2730     if (servers == 1 || (regions < bulkAssignThresholdRegions
2731         && servers < bulkAssignThresholdServers)) {
2732 
2733       // Not use bulk assignment.  This could be more efficient in small
2734       // cluster, especially mini cluster for testing, so that tests won't time out
2735       if (LOG.isTraceEnabled()) {
2736         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2737           " region(s) to " + servers + " server(s)");
2738       }
2739       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2740         if (!assign(plan.getKey(), plan.getValue())) {
2741           for (HRegionInfo region: plan.getValue()) {
2742             if (!regionStates.isRegionOnline(region)) {
2743               invokeAssign(region);
2744             }
2745           }
2746         }
2747       }
2748     } else {
2749       LOG.info("Bulk assigning " + regions + " region(s) across "
2750         + totalServers + " server(s), " + message);
2751 
2752       // Use fixed count thread pool assigning.
2753       BulkAssigner ba = new GeneralBulkAssigner(
2754         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2755       ba.bulkAssign();
2756       LOG.info("Bulk assigning done");
2757     }
2758   }
2759 
2760   /**
2761    * Assigns all user regions, if any exist.  Used during cluster startup.
2762    * <p>
2763    * This is a synchronous call and will return once every region has been
2764    * assigned.  If anything fails, an exception is thrown and the cluster
2765    * should be shutdown.
2766    * @throws InterruptedException
2767    * @throws IOException
2768    * @throws KeeperException
2769    */
2770   private void assignAllUserRegions(Set<TableName> disabledOrDisablingOrEnabling)
2771       throws IOException, InterruptedException, KeeperException {
2772     // Skip assignment for regions of tables in DISABLING state because during clean cluster startup
2773     // no RS is alive and regions map also doesn't have any information about the regions.
2774     // See HBASE-6281.
2775     // Scan hbase:meta for all user regions, skipping any disabled tables
2776     Map<HRegionInfo, ServerName> allRegions;
2777     SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
2778        new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
2779     snapshotOfRegionAssignment.initialize();
2780     allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
2781     if (allRegions == null || allRegions.isEmpty()) {
2782       return;
2783     }
2784 
2785     // Determine what type of assignment to do on startup
2786     boolean retainAssignment = server.getConfiguration().
2787       getBoolean("hbase.master.startup.retainassign", true);
2788 
2789     if (retainAssignment) {
2790       assign(allRegions);
2791     } else {
2792       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
2793       assign(regions);
2794     }
2795 
2796     for (HRegionInfo hri : allRegions.keySet()) {
2797       TableName tableName = hri.getTable();
2798       if (!zkTable.isEnabledTable(tableName)) {
2799         setEnabledTable(tableName);
2800       }
2801     }
2802   }
2803 
2804   /**
2805    * Wait until no regions in transition.
2806    * @param timeout How long to wait.
2807    * @return True if nothing in regions in transition.
2808    * @throws InterruptedException
2809    */
2810   boolean waitUntilNoRegionsInTransition(final long timeout)
2811       throws InterruptedException {
2812     // Blocks until there are no regions in transition. It is possible that
2813     // there
2814     // are regions in transition immediately after this returns but guarantees
2815     // that if it returns without an exception that there was a period of time
2816     // with no regions in transition from the point-of-view of the in-memory
2817     // state of the Master.
2818     final long endTime = System.currentTimeMillis() + timeout;
2819 
2820     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2821         && endTime > System.currentTimeMillis()) {
2822       regionStates.waitForUpdate(100);
2823     }
2824 
2825     return !regionStates.isRegionsInTransition();
2826   }
2827 
2828   /**
2829    * Rebuild the list of user regions and assignment information.
2830    * <p>
2831    * Returns a map of servers that are not found to be online and the regions
2832    * they were hosting.
2833    * @return map of servers not online to their assigned regions, as stored
2834    *         in META
2835    * @throws IOException
2836    */
2837   Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
2838     Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2839     Set<TableName> disabledTables = ZKTable.getDisabledTables(watcher);
2840     Set<TableName> disabledOrEnablingTables = new HashSet<TableName>(disabledTables);
2841     disabledOrEnablingTables.addAll(enablingTables);
2842     Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
2843     disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
2844 
2845     // Region assignment from META
2846     List<Result> results = MetaReader.fullScan(this.catalogTracker);
2847     // Get any new but slow to checkin region server that joined the cluster
2848     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2849     // Map of offline servers and their regions to be returned
2850     Map<ServerName, List<HRegionInfo>> offlineServers =
2851       new TreeMap<ServerName, List<HRegionInfo>>();
2852     // Iterate regions in META
2853     for (Result result : results) {
2854       HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(result);
2855       if (regionInfo == null) continue;
2856       State state = RegionStateStore.getRegionState(result);
2857       ServerName regionLocation = RegionStateStore.getRegionServer(result);
2858       if (disabledTables.contains(regionInfo.getTable())) {
2859         // ignore regionLocation for disabled tables.
2860         regionLocation = null;
2861       }
2862       regionStates.createRegionState(regionInfo, state, regionLocation);
2863       if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
2864         // Region is not open (either offline or in transition), skip
2865         continue;
2866       }
2867       TableName tableName = regionInfo.getTable();
2868       if (!onlineServers.contains(regionLocation)) {
2869         // Region is located on a server that isn't online
2870         List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
2871         if (offlineRegions == null) {
2872           offlineRegions = new ArrayList<HRegionInfo>(1);
2873           offlineServers.put(regionLocation, offlineRegions);
2874         }
2875         if (useZKForAssignment) {
2876           regionStates.regionOffline(regionInfo);
2877         }
2878         offlineRegions.add(regionInfo);
2879       } else if (!disabledOrEnablingTables.contains(tableName)) {
2880         // Region is being served and on an active server
2881         // add only if region not in disabled or enabling table
2882 
2883         regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation);
2884         regionStates.regionOnline(regionInfo, regionLocation);
2885         balancer.regionOnline(regionInfo, regionLocation);
2886       } else if (useZKForAssignment) {
2887         regionStates.regionOffline(regionInfo);
2888       }
2889       // need to enable the table if not disabled or disabling or enabling
2890       // this will be used in rolling restarts
2891       if (!disabledOrDisablingOrEnabling.contains(tableName)
2892           && !getZKTable().isEnabledTable(tableName)) {
2893         setEnabledTable(tableName);
2894       }
2895 
2896     }
2897     return offlineServers;
2898   }
2899 
2900   /**
2901    * Recover the tables that were not fully moved to DISABLED state. These
2902    * tables are in DISABLING state when the master restarted/switched.
2903    *
2904    * @throws KeeperException
2905    * @throws TableNotFoundException
2906    * @throws IOException
2907    */
2908   private void recoverTableInDisablingState()
2909       throws KeeperException, TableNotFoundException, IOException {
2910     Set<TableName> disablingTables = ZKTable.getDisablingTables(watcher);
2911     if (disablingTables.size() != 0) {
2912       for (TableName tableName : disablingTables) {
2913         // Recover by calling DisableTableHandler
2914         LOG.info("The table " + tableName
2915             + " is in DISABLING state.  Hence recovering by moving the table"
2916             + " to DISABLED state.");
2917         new DisableTableHandler(this.server, tableName, catalogTracker,
2918             this, tableLockManager, true).prepare().process();
2919       }
2920     }
2921   }
2922 
2923   /**
2924    * Recover the tables that are not fully moved to ENABLED state. These tables
2925    * are in ENABLING state when the master restarted/switched
2926    *
2927    * @throws KeeperException
2928    * @throws org.apache.hadoop.hbase.TableNotFoundException
2929    * @throws IOException
2930    */
2931   private void recoverTableInEnablingState()
2932       throws KeeperException, TableNotFoundException, IOException {
2933     Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2934     if (enablingTables.size() != 0) {
2935       for (TableName tableName : enablingTables) {
2936         // Recover by calling EnableTableHandler
2937         LOG.info("The table " + tableName
2938             + " is in ENABLING state.  Hence recovering by moving the table"
2939             + " to ENABLED state.");
2940         // enableTable in sync way during master startup,
2941         // no need to invoke coprocessor
2942         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
2943           catalogTracker, this, tableLockManager, true);
2944         try {
2945           eth.prepare();
2946         } catch (TableNotFoundException e) {
2947           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
2948           continue;
2949         }
2950         eth.process();
2951       }
2952     }
2953   }
2954 
2955   /**
2956    * Processes list of dead servers from result of hbase:meta scan and regions in RIT
2957    * <p>
2958    * This is used for failover to recover the lost regions that belonged to
2959    * RegionServers which failed while there was no active master or regions
2960    * that were in RIT.
2961    * <p>
2962    *
2963    *
2964    * @param deadServers
2965    *          The list of dead servers which failed while there was no active
2966    *          master. Can be null.
2967    * @throws IOException
2968    * @throws KeeperException
2969    */
2970   private void processDeadServersAndRecoverLostRegions(
2971       Map<ServerName, List<HRegionInfo>> deadServers)
2972           throws IOException, KeeperException {
2973     if (deadServers != null) {
2974       for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
2975         ServerName serverName = server.getKey();
2976         // We need to keep such info even if the server is known dead
2977         regionStates.setLastRegionServerOfRegions(serverName, server.getValue());
2978         if (!serverManager.isServerDead(serverName)) {
2979           serverManager.expireServer(serverName); // Let SSH do region re-assign
2980         }
2981       }
2982     }
2983 
2984     List<String> nodes = useZKForAssignment ?
2985       ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
2986       : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
2987     if (nodes != null && !nodes.isEmpty()) {
2988       for (String encodedRegionName : nodes) {
2989         processRegionInTransition(encodedRegionName, null);
2990       }
2991     } else if (!useZKForAssignment) {
2992        processRegionInTransitionZkLess();
2993     }
2994   }
2995   
2996   void processRegionInTransitionZkLess() {
2997     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
2998     // in case the RPC call is not sent out yet before the master was shut down
2999     // since we update the state before we send the RPC call. We can't update
3000     // the state after the RPC call. Otherwise, we don't know what's happened
3001     // to the region if the master dies right after the RPC call is out.
3002     Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3003     for (RegionState regionState : rits.values()) {
3004       LOG.info("Processing " + regionState);
3005       ServerName serverName = regionState.getServerName();
3006       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
3007       // case, try assigning it here.
3008       if (serverName != null
3009           && !serverManager.getOnlineServers().containsKey(serverName)) {
3010         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3011         continue; 
3012       }
3013       HRegionInfo regionInfo = regionState.getRegion();
3014       State state = regionState.getState();
3015       
3016       switch (state) {
3017       case CLOSED:
3018         invokeAssign(regionInfo);
3019         break;
3020       case PENDING_OPEN:
3021         retrySendRegionOpen(regionState);
3022         break;
3023       case PENDING_CLOSE:
3024         retrySendRegionClose(regionState);
3025         break;
3026       case FAILED_CLOSE:
3027       case FAILED_OPEN:  
3028         invokeUnassign(regionInfo);
3029         break;
3030       default:
3031         // No process for other states
3032       }
3033     }
3034   }
3035 
3036   /**
3037    * At master failover, for pending_open region, make sure
3038    * sendRegionOpen RPC call is sent to the target regionserver
3039    */
3040   private void retrySendRegionOpen(final RegionState regionState) {
3041     this.executorService.submit(
3042       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3043         @Override
3044         public void process() throws IOException {
3045           HRegionInfo hri = regionState.getRegion();
3046           ServerName serverName = regionState.getServerName();
3047           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3048           try {
3049             while (serverManager.isServerOnline(serverName)
3050                 && !server.isStopped() && !server.isAborted()) {
3051               try {
3052                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3053                 if (shouldAssignRegionsWithFavoredNodes) {
3054                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3055                 }
3056                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3057                   serverName, hri, -1, favoredNodes);
3058 
3059                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3060                   // Failed opening this region, this means the target server didn't get
3061                   // the original region open RPC, so re-assign it with a new plan
3062                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
3063                     + regionState + ", re-assign it");
3064                   invokeAssign(hri, true);
3065                 }
3066                 return; // Done.
3067               } catch (Throwable t) {
3068                 if (t instanceof RemoteException) {
3069                   t = ((RemoteException) t).unwrapRemoteException();
3070                 }
3071                 // In case SocketTimeoutException/FailedServerException, we will retry
3072                 if (t instanceof java.net.SocketTimeoutException
3073                     || t instanceof FailedServerException) {
3074                   Threads.sleep(100);
3075                   continue;
3076                 }
3077                 // For other exceptions, re-assign it
3078                 LOG.debug("Got exception in retry sendRegionOpen for "
3079                   + regionState + ", re-assign it", t);
3080                 invokeAssign(hri);
3081                 return; // Done.
3082               }
3083             }
3084           } finally {
3085             lock.unlock();
3086           }
3087         }
3088       });
3089   }
3090 
3091   /**
3092    * At master failover, for pending_close region, make sure
3093    * sendRegionClose RPC call is sent to the target regionserver
3094    */
3095   private void retrySendRegionClose(final RegionState regionState) {
3096     this.executorService.submit(
3097       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3098         @Override
3099         public void process() throws IOException {
3100           HRegionInfo hri = regionState.getRegion();
3101           ServerName serverName = regionState.getServerName();
3102           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3103           try {
3104             while (serverManager.isServerOnline(serverName)
3105                 && !server.isStopped() && !server.isAborted()) {
3106               try {
3107                 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3108                   // This means the region is still on the target server
3109                   LOG.debug("Got false in retry sendRegionClose for "
3110                     + regionState + ", re-close it");
3111                   invokeUnAssign(hri);
3112                 }
3113                 return; // Done.
3114               } catch (Throwable t) {
3115                 if (t instanceof RemoteException) {
3116                   t = ((RemoteException) t).unwrapRemoteException();
3117                 }
3118                 // In case SocketTimeoutException/FailedServerException, we will retry
3119                 if (t instanceof java.net.SocketTimeoutException
3120                     || t instanceof FailedServerException) {
3121                   Threads.sleep(100);
3122                   continue;
3123                 }
3124                 if (!(t instanceof NotServingRegionException
3125                     || t instanceof RegionAlreadyInTransitionException)) {
3126                   // NotServingRegionException/RegionAlreadyInTransitionException
3127                   // means the target server got the original region close request.
3128                   // For other exceptions, re-close it
3129                   LOG.debug("Got exception in retry sendRegionClose for "
3130                     + regionState + ", re-close it", t);
3131                   invokeUnAssign(hri);
3132                 }
3133                 return; // Done.
3134               }
3135             }
3136           } finally {
3137             lock.unlock();
3138           }
3139         }
3140       });
3141   }
3142 
3143   /**
3144    * Set Regions in transitions metrics.
3145    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
3146    * This iterator is not fail fast, which may lead to stale read; but that's better than
3147    * creating a copy of the map for metrics computation, as this method will be invoked
3148    * on a frequent interval.
3149    */
3150   public void updateRegionsInTransitionMetrics() {
3151     long currentTime = System.currentTimeMillis();
3152     int totalRITs = 0;
3153     int totalRITsOverThreshold = 0;
3154     long oldestRITTime = 0;
3155     int ritThreshold = this.server.getConfiguration().
3156       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3157     for (RegionState state: regionStates.getRegionsInTransition().values()) {
3158       totalRITs++;
3159       long ritTime = currentTime - state.getStamp();
3160       if (ritTime > ritThreshold) { // more than the threshold
3161         totalRITsOverThreshold++;
3162       }
3163       if (oldestRITTime < ritTime) {
3164         oldestRITTime = ritTime;
3165       }
3166     }
3167     if (this.metricsAssignmentManager != null) {
3168       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3169       this.metricsAssignmentManager.updateRITCount(totalRITs);
3170       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3171     }
3172   }
3173 
3174   /**
3175    * @param region Region whose plan we are to clear.
3176    */
3177   void clearRegionPlan(final HRegionInfo region) {
3178     synchronized (this.regionPlans) {
3179       this.regionPlans.remove(region.getEncodedName());
3180     }
3181   }
3182 
3183   /**
3184    * Wait on region to clear regions-in-transition.
3185    * @param hri Region to wait on.
3186    * @throws IOException
3187    */
3188   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3189       throws IOException, InterruptedException {
3190     waitOnRegionToClearRegionsInTransition(hri, -1L);
3191   }
3192 
3193   /**
3194    * Wait on region to clear regions-in-transition or time out
3195    * @param hri
3196    * @param timeOut Milliseconds to wait for current region to be out of transition state.
3197    * @return True when a region clears regions-in-transition before timeout otherwise false
3198    * @throws InterruptedException
3199    */
3200   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3201       throws InterruptedException {
3202     if (!regionStates.isRegionInTransition(hri)) return true;
3203     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
3204         + timeOut;
3205     // There is already a timeout monitor on regions in transition so I
3206     // should not have to have one here too?
3207     LOG.info("Waiting for " + hri.getEncodedName() +
3208         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3209     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3210       regionStates.waitForUpdate(100);
3211       if (EnvironmentEdgeManager.currentTimeMillis() > end) {
3212         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3213         return false;
3214       }
3215     }
3216     if (this.server.isStopped()) {
3217       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3218       return false;
3219     }
3220     return true;
3221   }
3222 
3223   /**
3224    * Update timers for all regions in transition going against the server in the
3225    * serversInUpdatingTimer.
3226    */
3227   public class TimerUpdater extends Chore {
3228 
3229     public TimerUpdater(final int period, final Stoppable stopper) {
3230       super("AssignmentTimerUpdater", period, stopper);
3231     }
3232 
3233     @Override
3234     protected void chore() {
3235       Preconditions.checkState(tomActivated);
3236       ServerName serverToUpdateTimer = null;
3237       while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
3238         if (serverToUpdateTimer == null) {
3239           serverToUpdateTimer = serversInUpdatingTimer.first();
3240         } else {
3241           serverToUpdateTimer = serversInUpdatingTimer
3242               .higher(serverToUpdateTimer);
3243         }
3244         if (serverToUpdateTimer == null) {
3245           break;
3246         }
3247         updateTimers(serverToUpdateTimer);
3248         serversInUpdatingTimer.remove(serverToUpdateTimer);
3249       }
3250     }
3251   }
3252 
3253   /**
3254    * Monitor to check for time outs on region transition operations
3255    */
3256   public class TimeoutMonitor extends Chore {
3257     private boolean allRegionServersOffline = false;
3258     private ServerManager serverManager;
3259     private final int timeout;
3260 
3261     /**
3262      * Creates a periodic monitor to check for time outs on region transition
3263      * operations.  This will deal with retries if for some reason something
3264      * doesn't happen within the specified timeout.
3265      * @param period
3266    * @param stopper When {@link Stoppable#isStopped()} is true, this thread will
3267    * cleanup and exit cleanly.
3268      * @param timeout
3269      */
3270     public TimeoutMonitor(final int period, final Stoppable stopper,
3271         ServerManager serverManager,
3272         final int timeout) {
3273       super("AssignmentTimeoutMonitor", period, stopper);
3274       this.timeout = timeout;
3275       this.serverManager = serverManager;
3276     }
3277 
3278     private synchronized void setAllRegionServersOffline(
3279       boolean allRegionServersOffline) {
3280       this.allRegionServersOffline = allRegionServersOffline;
3281     }
3282 
3283     @Override
3284     protected void chore() {
3285       Preconditions.checkState(tomActivated);
3286       boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
3287 
3288       // Iterate all regions in transition checking for time outs
3289       long now = System.currentTimeMillis();
3290       // no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
3291       //  a copy while another thread is adding/removing items
3292       for (String regionName : regionStates.getRegionsInTransition().keySet()) {
3293         RegionState regionState = regionStates.getRegionTransitionState(regionName);
3294         if (regionState == null) continue;
3295 
3296         if (regionState.getStamp() + timeout <= now) {
3297           // decide on action upon timeout
3298           actOnTimeOut(regionState);
3299         } else if (this.allRegionServersOffline && !noRSAvailable) {
3300           RegionPlan existingPlan = regionPlans.get(regionName);
3301           if (existingPlan == null
3302               || !this.serverManager.isServerOnline(existingPlan
3303                   .getDestination())) {
3304             // if some RSs just came back online, we can start the assignment
3305             // right away
3306             actOnTimeOut(regionState);
3307           }
3308         }
3309       }
3310       setAllRegionServersOffline(noRSAvailable);
3311     }
3312 
3313     private void actOnTimeOut(RegionState regionState) {
3314       HRegionInfo regionInfo = regionState.getRegion();
3315       LOG.info("Regions in transition timed out:  " + regionState);
3316       // Expired! Do a retry.
3317       switch (regionState.getState()) {
3318       case CLOSED:
3319         LOG.info("Region " + regionInfo.getEncodedName()
3320             + " has been CLOSED for too long, waiting on queued "
3321             + "ClosedRegionHandler to run or server shutdown");
3322         // Update our timestamp.
3323         regionState.updateTimestampToNow();
3324         break;
3325       case OFFLINE:
3326         LOG.info("Region has been OFFLINE for too long, " + "reassigning "
3327             + regionInfo.getRegionNameAsString() + " to a random server");
3328         invokeAssign(regionInfo);
3329         break;
3330       case PENDING_OPEN:
3331         LOG.info("Region has been PENDING_OPEN for too "
3332             + "long, reassigning region=" + regionInfo.getRegionNameAsString());
3333         invokeAssign(regionInfo);
3334         break;
3335       case OPENING:
3336         processOpeningState(regionInfo);
3337         break;
3338       case OPEN:
3339         LOG.error("Region has been OPEN for too long, " +
3340             "we don't know where region was opened so can't do anything");
3341         regionState.updateTimestampToNow();
3342         break;
3343 
3344       case PENDING_CLOSE:
3345         LOG.info("Region has been PENDING_CLOSE for too "
3346             + "long, running forced unassign again on region="
3347             + regionInfo.getRegionNameAsString());
3348         invokeUnassign(regionInfo);
3349         break;
3350       case CLOSING:
3351         LOG.info("Region has been CLOSING for too " +
3352           "long, this should eventually complete or the server will " +
3353           "expire, send RPC again");
3354         invokeUnassign(regionInfo);
3355         break;
3356 
3357       case SPLIT:
3358       case SPLITTING:
3359       case FAILED_OPEN:
3360       case FAILED_CLOSE:
3361       case MERGING:
3362         break;
3363 
3364       default:
3365         throw new IllegalStateException("Received event is not valid.");
3366       }
3367     }
3368   }
3369 
3370   private void processOpeningState(HRegionInfo regionInfo) {
3371     LOG.info("Region has been OPENING for too long, reassigning region="
3372         + regionInfo.getRegionNameAsString());
3373     // Should have a ZK node in OPENING state
3374     try {
3375       String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
3376       Stat stat = new Stat();
3377       byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat);
3378       if (data == null) {
3379         LOG.warn("Data is null, node " + node + " no longer exists");
3380         return;
3381       }
3382       RegionTransition rt = RegionTransition.parseFrom(data);
3383       EventType et = rt.getEventType();
3384       if (et == EventType.RS_ZK_REGION_OPENED) {
3385         LOG.debug("Region has transitioned to OPENED, allowing "
3386             + "watched event handlers to process");
3387         return;
3388       } else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) {
3389         LOG.warn("While timing out a region, found ZK node in unexpected state: " + et);
3390         return;
3391       }
3392       invokeAssign(regionInfo);
3393     } catch (KeeperException ke) {
3394       LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
3395     } catch (DeserializationException e) {
3396       LOG.error("Unexpected exception parsing CLOSING region", e);
3397     }
3398   }
3399 
3400   void invokeAssign(HRegionInfo regionInfo) {
3401     invokeAssign(regionInfo, true);
3402   }
3403 
3404   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3405     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3406   }
3407 
3408   void invokeUnAssign(HRegionInfo regionInfo) {
3409     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3410   }
3411 
3412   private void invokeUnassign(HRegionInfo regionInfo) {
3413     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3414   }
3415 
3416   public boolean isCarryingMeta(ServerName serverName) {
3417     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3418   }
3419 
3420   /**
3421    * Check if the shutdown server carries the specific region.
3422    * We have a bunch of places that store region location
3423    * Those values aren't consistent. There is a delay of notification.
3424    * The location from zookeeper unassigned node has the most recent data;
3425    * but the node could be deleted after the region is opened by AM.
3426    * The AM's info could be old when OpenedRegionHandler
3427    * processing hasn't finished yet when server shutdown occurs.
3428    * @return whether the serverName currently hosts the region
3429    */
3430   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3431     RegionTransition rt = null;
3432     try {
3433       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3434       // This call can legitimately come by null
3435       rt = data == null? null: RegionTransition.parseFrom(data);
3436     } catch (KeeperException e) {
3437       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3438     } catch (DeserializationException e) {
3439       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3440     }
3441 
3442     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3443     if (addressFromZK != null) {
3444       // if we get something from ZK, we will use the data
3445       boolean matchZK = addressFromZK.equals(serverName);
3446       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3447         " current=" + serverName + ", matches=" + matchZK);
3448       return matchZK;
3449     }
3450 
3451     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3452     boolean matchAM = (addressFromAM != null &&
3453       addressFromAM.equals(serverName));
3454     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3455       " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3456       " server being checked: " + serverName);
3457 
3458     return matchAM;
3459   }
3460 
3461   /**
3462    * Process shutdown server removing any assignments.
3463    * @param sn Server that went down.
3464    * @return list of regions in transition on this server
3465    */
3466   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3467     // Clean out any existing assignment plans for this server
3468     synchronized (this.regionPlans) {
3469       for (Iterator <Map.Entry<String, RegionPlan>> i =
3470           this.regionPlans.entrySet().iterator(); i.hasNext();) {
3471         Map.Entry<String, RegionPlan> e = i.next();
3472         ServerName otherSn = e.getValue().getDestination();
3473         // The name will be null if the region is planned for a random assign.
3474         if (otherSn != null && otherSn.equals(sn)) {
3475           // Use iterator's remove else we'll get CME
3476           i.remove();
3477         }
3478       }
3479     }
3480     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3481     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3482       HRegionInfo hri = it.next();
3483       String encodedName = hri.getEncodedName();
3484 
3485       // We need a lock on the region as we could update it
3486       Lock lock = locker.acquireLock(encodedName);
3487       try {
3488         RegionState regionState =
3489           regionStates.getRegionTransitionState(encodedName);
3490         if (regionState == null
3491             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3492             || !(regionState.isFailedClose() || regionState.isOffline()
3493               || regionState.isPendingOpenOrOpening())) {
3494           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3495             + " on the dead server any more: " + sn);
3496           it.remove();
3497         } else {
3498           try {
3499             // Delete the ZNode if exists
3500             ZKAssign.deleteNodeFailSilent(watcher, hri);
3501           } catch (KeeperException ke) {
3502             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3503           }
3504           if (zkTable.isDisablingOrDisabledTable(hri.getTable())) {
3505             regionStates.regionOffline(hri);
3506             it.remove();
3507             continue;
3508           }
3509           // Mark the region offline and assign it again by SSH
3510           regionStates.updateRegionState(hri, State.OFFLINE);
3511         }
3512       } finally {
3513         lock.unlock();
3514       }
3515     }
3516     return regions;
3517   }
3518 
3519   /**
3520    * @param plan Plan to execute.
3521    */
3522   public void balance(final RegionPlan plan) {
3523     HRegionInfo hri = plan.getRegionInfo();
3524     TableName tableName = hri.getTable();
3525     if (zkTable.isDisablingOrDisabledTable(tableName)) {
3526       LOG.info("Ignored moving region of disabling/disabled table "
3527         + tableName);
3528       return;
3529     }
3530 
3531     // Move the region only if it's assigned
3532     String encodedName = hri.getEncodedName();
3533     ReentrantLock lock = locker.acquireLock(encodedName);
3534     try {
3535       if (!regionStates.isRegionOnline(hri)) {
3536         RegionState state = regionStates.getRegionState(encodedName);
3537         LOG.info("Ignored moving region not assigned: " + hri + ", "
3538           + (state == null ? "not in region states" : state));
3539         return;
3540       }
3541       synchronized (this.regionPlans) {
3542         this.regionPlans.put(plan.getRegionName(), plan);
3543       }
3544       unassign(hri, false, plan.getDestination());
3545     } finally {
3546       lock.unlock();
3547     }
3548   }
3549 
3550   public void stop() {
3551     shutdown(); // Stop executor service, etc
3552     if (tomActivated){
3553       this.timeoutMonitor.interrupt();
3554       this.timerUpdater.interrupt();
3555     }
3556   }
3557 
3558   /**
3559    * Shutdown the threadpool executor service
3560    */
3561   public void shutdown() {
3562     // It's an immediate shutdown, so we're clearing the remaining tasks.
3563     synchronized (zkEventWorkerWaitingList){
3564       zkEventWorkerWaitingList.clear();
3565     }
3566     threadPoolExecutorService.shutdownNow();
3567     zkEventWorkers.shutdownNow();
3568     regionStateStore.stop();
3569   }
3570 
3571   protected void setEnabledTable(TableName tableName) {
3572     try {
3573       this.zkTable.setEnabledTable(tableName);
3574     } catch (KeeperException e) {
3575       // here we can abort as it is the start up flow
3576       String errorMsg = "Unable to ensure that the table " + tableName
3577           + " will be" + " enabled because of a ZooKeeper issue";
3578       LOG.error(errorMsg);
3579       this.server.abort(errorMsg, e);
3580     }
3581   }
3582 
3583   /**
3584    * Set region as OFFLINED up in zookeeper asynchronously.
3585    * @param state
3586    * @return True if we succeeded, false otherwise (State was incorrect or failed
3587    * updating zk).
3588    */
3589   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3590       final AsyncCallback.StringCallback cb, final ServerName destination) {
3591     if (!state.isClosed() && !state.isOffline()) {
3592       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3593         new IllegalStateException());
3594       return false;
3595     }
3596     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3597     try {
3598       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3599         destination, cb, state);
3600     } catch (KeeperException e) {
3601       if (e instanceof NodeExistsException) {
3602         LOG.warn("Node for " + state.getRegion() + " already exists");
3603       } else {
3604         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3605       }
3606       return false;
3607     }
3608     return true;
3609   }
3610 
3611   private boolean deleteNodeInStates(String encodedName,
3612       String desc, ServerName sn, EventType... types) {
3613     try {
3614       for (EventType et: types) {
3615         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3616           return true;
3617         }
3618       }
3619       LOG.info("Failed to delete the " + desc + " node for "
3620         + encodedName + ". The node type may not match");
3621     } catch (NoNodeException e) {
3622       if (LOG.isDebugEnabled()) {
3623         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3624       }
3625     } catch (KeeperException ke) {
3626       server.abort("Unexpected ZK exception deleting " + desc
3627         + " node for the region " + encodedName, ke);
3628     }
3629     return false;
3630   }
3631 
3632   private void deleteMergingNode(String encodedName, ServerName sn) {
3633     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3634       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3635   }
3636 
3637   private void deleteSplittingNode(String encodedName, ServerName sn) {
3638     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3639       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3640   }
3641 
3642   private void onRegionFailedOpen(
3643       final HRegionInfo hri, final ServerName sn) {
3644     String encodedName = hri.getEncodedName();
3645     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3646     if (failedOpenCount == null) {
3647       failedOpenCount = new AtomicInteger();
3648       // No need to use putIfAbsent, or extra synchronization since
3649       // this whole handleRegion block is locked on the encoded region
3650       // name, and failedOpenTracker is updated only in this block
3651       failedOpenTracker.put(encodedName, failedOpenCount);
3652     }
3653     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion() ) {
3654       regionStates.updateRegionState(hri, State.FAILED_OPEN);
3655       // remove the tracking info to save memory, also reset
3656       // the count for next open initiative
3657       failedOpenTracker.remove(encodedName);
3658     }
3659     else {
3660       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3661         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
3662         // so that we are aware of potential problem if it persists for a long time.
3663         LOG.warn("Failed to open the hbase:meta region " +
3664             hri.getRegionNameAsString() + " after" +
3665             failedOpenCount.get() + " retries. Continue retrying.");
3666       }
3667 
3668       // Handle this the same as if it were opened and then closed.
3669       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3670       if (regionState != null) {
3671         // When there are more than one region server a new RS is selected as the
3672         // destination and the same is updated in the region plan. (HBASE-5546)
3673         Set<TableName> disablingOrDisabled = null;
3674         try {
3675           disablingOrDisabled = ZKTable.getDisablingTables(watcher);
3676           disablingOrDisabled.addAll(ZKTable.getDisabledTables(watcher));
3677         } catch (KeeperException e) {
3678           server.abort("Cannot retrieve info about disabling or disabled tables ", e);
3679         }
3680         if (disablingOrDisabled.contains(hri.getTable())) {
3681           offlineDisabledRegion(hri);
3682           return;
3683         }
3684         // ZK Node is in CLOSED state, assign it.
3685          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3686         // This below has to do w/ online enable/disable of a table
3687         removeClosedRegion(hri);
3688         try {
3689           getRegionPlan(hri, sn, true);
3690         } catch (HBaseIOException e) {
3691           LOG.warn("Failed to get region plan", e);
3692         }
3693         invokeAssign(hri, false);
3694       }
3695     }
3696   }
3697 
3698   private void onRegionOpen(
3699       final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3700     regionOnline(hri, sn, openSeqNum);
3701     if (useZKForAssignment) {
3702       try {
3703         // Delete the ZNode if exists
3704         ZKAssign.deleteNodeFailSilent(watcher, hri);
3705       } catch (KeeperException ke) {
3706         server.abort("Unexpected ZK exception deleting node " + hri, ke);
3707       }
3708     }
3709 
3710     // reset the count, if any
3711     failedOpenTracker.remove(hri.getEncodedName());
3712     if (isTableDisabledOrDisabling(hri.getTable())) {
3713       invokeUnAssign(hri);
3714     }
3715   }
3716 
3717   private void onRegionClosed(final HRegionInfo hri) {
3718     if (isTableDisabledOrDisabling(hri.getTable())) {
3719       offlineDisabledRegion(hri);
3720       return;
3721     }
3722     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3723     // This below has to do w/ online enable/disable of a table
3724     removeClosedRegion(hri);
3725     invokeAssign(hri, false);
3726   }
3727 
3728   private String onRegionSplit(ServerName sn, TransitionCode code,
3729       HRegionInfo p, HRegionInfo a, HRegionInfo b) {
3730     RegionState rs_p = regionStates.getRegionState(p);
3731     RegionState rs_a = regionStates.getRegionState(a);
3732     RegionState rs_b = regionStates.getRegionState(b);
3733     if (!(rs_p.isOpenOrSplittingOnServer(sn)
3734         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3735         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3736       return "Not in state good for split";
3737     }
3738 
3739     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3740     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3741     regionStates.updateRegionState(p, State.SPLITTING);
3742 
3743     if (code == TransitionCode.SPLIT) {
3744       if (TEST_SKIP_SPLIT_HANDLING) {
3745         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3746       }
3747       regionOffline(p, State.SPLIT);
3748       regionOnline(a, sn, 1);
3749       regionOnline(b, sn, 1);
3750 
3751       // User could disable the table before master knows the new region.
3752       if (isTableDisabledOrDisabling(p.getTable())) {
3753         invokeUnAssign(a);
3754         invokeUnAssign(b);
3755       }
3756     } else if (code == TransitionCode.SPLIT_PONR) {
3757       try {
3758         regionStateStore.splitRegion(p, a, b, sn);
3759       } catch (IOException ioe) {
3760         LOG.info("Failed to record split region " + p.getShortNameToLog());
3761         return "Failed to record the splitting in meta";
3762       }
3763     } else if (code == TransitionCode.SPLIT_REVERTED) {
3764       regionOnline(p, sn);
3765       regionOffline(a);
3766       regionOffline(b);
3767 
3768       if (isTableDisabledOrDisabling(p.getTable())) {
3769         invokeUnAssign(p);
3770       }
3771     }
3772     return null;
3773   }
3774 
3775   private boolean isTableDisabledOrDisabling(TableName t) {
3776     Set<TableName> disablingOrDisabled = null;
3777     try {
3778       disablingOrDisabled = ZKTable.getDisablingTables(watcher);
3779       disablingOrDisabled.addAll(ZKTable.getDisabledTables(watcher));
3780     } catch (KeeperException e) {
3781       server.abort("Cannot retrieve info about disabling or disabled tables ", e);
3782     }
3783     return disablingOrDisabled.contains(t) ? true : false;
3784   }
3785 
3786   private String onRegionMerge(ServerName sn, TransitionCode code,
3787       HRegionInfo p, HRegionInfo a, HRegionInfo b) {
3788     RegionState rs_p = regionStates.getRegionState(p);
3789     RegionState rs_a = regionStates.getRegionState(a);
3790     RegionState rs_b = regionStates.getRegionState(b);
3791     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3792         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3793       return "Not in state good for merge";
3794     }
3795 
3796     regionStates.updateRegionState(a, State.MERGING);
3797     regionStates.updateRegionState(b, State.MERGING);
3798     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3799 
3800     String encodedName = p.getEncodedName();
3801     if (code == TransitionCode.READY_TO_MERGE) {
3802       mergingRegions.put(encodedName,
3803         new PairOfSameType<HRegionInfo>(a, b));
3804     } else if (code == TransitionCode.MERGED) {
3805       mergingRegions.remove(encodedName);
3806       regionOffline(a, State.MERGED);
3807       regionOffline(b, State.MERGED);
3808       regionOnline(p, sn, 1);
3809 
3810       // User could disable the table before master knows the new region.
3811       if (isTableDisabledOrDisabling(p.getTable())) {
3812         invokeUnAssign(p);
3813       }
3814     } else if (code == TransitionCode.MERGE_PONR) {
3815       try {
3816         regionStateStore.mergeRegions(p, a, b, sn);
3817       } catch (IOException ioe) {
3818         LOG.info("Failed to record merged region " + p.getShortNameToLog());
3819         return "Failed to record the merging in meta";
3820       }
3821     } else {
3822       mergingRegions.remove(encodedName);
3823       regionOnline(a, sn);
3824       regionOnline(b, sn);
3825       regionOffline(p);
3826 
3827       if (isTableDisabledOrDisabling(p.getTable())) {
3828         invokeUnAssign(a);
3829         invokeUnAssign(b);
3830       }
3831     }
3832     return null;
3833   }
3834 
3835   /**
3836    * A helper to handle region merging transition event.
3837    * It transitions merging regions to MERGING state.
3838    */
3839   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3840       final String prettyPrintedRegionName, final ServerName sn) {
3841     if (!serverManager.isServerOnline(sn)) {
3842       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3843       return false;
3844     }
3845     byte [] payloadOfMerging = rt.getPayload();
3846     List<HRegionInfo> mergingRegions;
3847     try {
3848       mergingRegions = HRegionInfo.parseDelimitedFrom(
3849         payloadOfMerging, 0, payloadOfMerging.length);
3850     } catch (IOException e) {
3851       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3852         + " payload for " + prettyPrintedRegionName);
3853       return false;
3854     }
3855     assert mergingRegions.size() == 3;
3856     HRegionInfo p = mergingRegions.get(0);
3857     HRegionInfo hri_a = mergingRegions.get(1);
3858     HRegionInfo hri_b = mergingRegions.get(2);
3859 
3860     RegionState rs_p = regionStates.getRegionState(p);
3861     RegionState rs_a = regionStates.getRegionState(hri_a);
3862     RegionState rs_b = regionStates.getRegionState(hri_b);
3863 
3864     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3865         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3866         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3867       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3868         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3869       return false;
3870     }
3871 
3872     EventType et = rt.getEventType();
3873     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3874       try {
3875         if (RegionMergeTransaction.transitionMergingNode(watcher, p,
3876             hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE,
3877             EventType.RS_ZK_REGION_MERGING) == -1) {
3878           byte[] data = ZKAssign.getData(watcher, encodedName);
3879           EventType currentType = null;
3880           if (data != null) {
3881             RegionTransition newRt = RegionTransition.parseFrom(data);
3882             currentType = newRt.getEventType();
3883           }
3884           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3885               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3886             LOG.warn("Failed to transition pending_merge node "
3887               + encodedName + " to merging, it's now " + currentType);
3888             return false;
3889           }
3890         }
3891       } catch (Exception e) {
3892         LOG.warn("Failed to transition pending_merge node "
3893           + encodedName + " to merging", e);
3894         return false;
3895       }
3896     }
3897 
3898     synchronized (regionStates) {
3899       regionStates.updateRegionState(hri_a, State.MERGING);
3900       regionStates.updateRegionState(hri_b, State.MERGING);
3901       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3902 
3903       if (et != EventType.RS_ZK_REGION_MERGED) {
3904         this.mergingRegions.put(encodedName,
3905           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3906       } else {
3907         this.mergingRegions.remove(encodedName);
3908         regionOffline(hri_a, State.MERGED);
3909         regionOffline(hri_b, State.MERGED);
3910         regionOnline(p, sn);
3911       }
3912     }
3913 
3914     if (et == EventType.RS_ZK_REGION_MERGED) {
3915       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3916       // Remove region from ZK
3917       try {
3918         boolean successful = false;
3919         while (!successful) {
3920           // It's possible that the RS tickles in between the reading of the
3921           // znode and the deleting, so it's safe to retry.
3922           successful = ZKAssign.deleteNode(watcher, encodedName,
3923             EventType.RS_ZK_REGION_MERGED, sn);
3924         }
3925       } catch (KeeperException e) {
3926         if (e instanceof NoNodeException) {
3927           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3928           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3929         } else {
3930           server.abort("Error deleting MERGED node " + encodedName, e);
3931         }
3932       }
3933       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3934         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3935         + hri_b.getRegionNameAsString() + ", on " + sn);
3936 
3937       // User could disable the table before master knows the new region.
3938       if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
3939         unassign(p);
3940       }
3941     }
3942     return true;
3943   }
3944 
3945   /**
3946    * A helper to handle region splitting transition event.
3947    */
3948   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3949       final String prettyPrintedRegionName, final ServerName sn) {
3950     if (!serverManager.isServerOnline(sn)) {
3951       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3952       return false;
3953     }
3954     byte [] payloadOfSplitting = rt.getPayload();
3955     List<HRegionInfo> splittingRegions;
3956     try {
3957       splittingRegions = HRegionInfo.parseDelimitedFrom(
3958         payloadOfSplitting, 0, payloadOfSplitting.length);
3959     } catch (IOException e) {
3960       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
3961         + " payload for " + prettyPrintedRegionName);
3962       return false;
3963     }
3964     assert splittingRegions.size() == 2;
3965     HRegionInfo hri_a = splittingRegions.get(0);
3966     HRegionInfo hri_b = splittingRegions.get(1);
3967 
3968     RegionState rs_p = regionStates.getRegionState(encodedName);
3969     RegionState rs_a = regionStates.getRegionState(hri_a);
3970     RegionState rs_b = regionStates.getRegionState(hri_b);
3971 
3972     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
3973         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3974         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3975       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
3976         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3977       return false;
3978     }
3979 
3980     if (rs_p == null) {
3981       // Splitting region should be online
3982       rs_p = regionStates.updateRegionState(rt, State.OPEN);
3983       if (rs_p == null) {
3984         LOG.warn("Received splitting for region " + prettyPrintedRegionName
3985           + " from server " + sn + " but it doesn't exist anymore,"
3986           + " probably already processed its split");
3987         return false;
3988       }
3989       regionStates.regionOnline(rs_p.getRegion(), sn);
3990     }
3991 
3992     HRegionInfo p = rs_p.getRegion();
3993     EventType et = rt.getEventType();
3994     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
3995       try {
3996         if (SplitTransaction.transitionSplittingNode(watcher, p,
3997             hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_SPLIT,
3998             EventType.RS_ZK_REGION_SPLITTING) == -1) {
3999           byte[] data = ZKAssign.getData(watcher, encodedName);
4000           EventType currentType = null;
4001           if (data != null) {
4002             RegionTransition newRt = RegionTransition.parseFrom(data);
4003             currentType = newRt.getEventType();
4004           }
4005           if (currentType == null || (currentType != EventType.RS_ZK_REGION_SPLIT
4006               && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4007             LOG.warn("Failed to transition pending_split node "
4008               + encodedName + " to splitting, it's now " + currentType);
4009             return false;
4010           }
4011         }
4012       } catch (Exception e) {
4013         LOG.warn("Failed to transition pending_split node "
4014           + encodedName + " to splitting", e);
4015         return false;
4016       }
4017     }
4018 
4019     synchronized (regionStates) {
4020       splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4021       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4022       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4023       regionStates.updateRegionState(rt, State.SPLITTING);
4024 
4025       // The below is for testing ONLY!  We can't do fault injection easily, so
4026       // resort to this kinda uglyness -- St.Ack 02/25/2011.
4027       if (TEST_SKIP_SPLIT_HANDLING) {
4028         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4029         return true; // return true so that the splitting node stays
4030       }
4031 
4032       if (et == EventType.RS_ZK_REGION_SPLIT) {
4033         regionOffline(p, State.SPLIT);
4034         regionOnline(hri_a, sn);
4035         regionOnline(hri_b, sn);
4036         splitRegions.remove(p);
4037       }
4038     }
4039 
4040     if (et == EventType.RS_ZK_REGION_SPLIT) {
4041       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4042       // Remove region from ZK
4043       try {
4044         boolean successful = false;
4045         while (!successful) {
4046           // It's possible that the RS tickles in between the reading of the
4047           // znode and the deleting, so it's safe to retry.
4048           successful = ZKAssign.deleteNode(watcher, encodedName,
4049             EventType.RS_ZK_REGION_SPLIT, sn);
4050         }
4051       } catch (KeeperException e) {
4052         if (e instanceof NoNodeException) {
4053           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4054           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
4055         } else {
4056           server.abort("Error deleting SPLIT node " + encodedName, e);
4057         }
4058       }
4059       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4060         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4061         + hri_b.getRegionNameAsString() + ", on " + sn);
4062 
4063       // User could disable the table before master knows the new region.
4064       if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
4065         unassign(hri_a);
4066         unassign(hri_b);
4067       }
4068     }
4069     return true;
4070   }
4071 
4072   /**
4073    * A region is offline.  The new state should be the specified one,
4074    * if not null.  If the specified state is null, the new state is Offline.
4075    * The specified state can be Split/Merged/Offline/null only.
4076    */
4077   private void regionOffline(final HRegionInfo regionInfo, final State state) {
4078     regionStates.regionOffline(regionInfo, state);
4079     removeClosedRegion(regionInfo);
4080     // remove the region plan as well just in case.
4081     clearRegionPlan(regionInfo);
4082     balancer.regionOffline(regionInfo);
4083 
4084     // Tell our listeners that a region was closed
4085     sendRegionClosedNotification(regionInfo);
4086   }
4087 
4088   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4089       final ServerName serverName) {
4090     if (!this.listeners.isEmpty()) {
4091       for (AssignmentListener listener : this.listeners) {
4092         listener.regionOpened(regionInfo, serverName);
4093       }
4094     }
4095   }
4096 
4097   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4098     if (!this.listeners.isEmpty()) {
4099       for (AssignmentListener listener : this.listeners) {
4100         listener.regionClosed(regionInfo);
4101       }
4102     }
4103   }
4104 
4105   /**
4106    * Try to update some region states. If the state machine prevents
4107    * such update, an error message is returned to explain the reason.
4108    *
4109    * It's expected that in each transition there should have just one
4110    * region for opening/closing, 3 regions for splitting/merging.
4111    * These regions should be on the server that requested the change.
4112    *
4113    * Region state machine. Only these transitions
4114    * are expected to be triggered by a region server.
4115    *
4116    * On the state transition:
4117    *  (1) Open/Close should be initiated by master
4118    *      (a) Master sets the region to pending_open/pending_close
4119    *        in memory and hbase:meta after sending the request
4120    *        to the region server
4121    *      (b) Region server reports back to the master
4122    *        after open/close is done (either success/failure)
4123    *      (c) If region server has problem to report the status
4124    *        to master, it must be because the master is down or some
4125    *        temporary network issue. Otherwise, the region server should
4126    *        abort since it must be a bug. If the master is not accessible,
4127    *        the region server should keep trying until the server is
4128    *        stopped or till the status is reported to the (new) master
4129    *      (d) If region server dies in the middle of opening/closing
4130    *        a region, SSH picks it up and finishes it
4131    *      (e) If master dies in the middle, the new master recovers
4132    *        the state during initialization from hbase:meta. Region server
4133    *        can report any transition that has not been reported to
4134    *        the previous active master yet
4135    *  (2) Split/merge is initiated by region servers
4136    *      (a) To split a region, a region server sends a request
4137    *        to master to try to set a region to splitting, together with
4138    *        two daughters (to be created) to splitting new. If approved
4139    *        by the master, the splitting can then move ahead
4140    *      (b) To merge two regions, a region server sends a request to
4141    *        master to try to set the new merged region (to be created) to
4142    *        merging_new, together with two regions (to be merged) to merging.
4143    *        If it is ok with the master, the merge can then move ahead
4144    *      (c) Once the splitting/merging is done, the region server
4145    *        reports the status back to the master either success/failure.
4146    *      (d) Other scenarios should be handled similarly as for
4147    *        region open/close
4148    */
4149   protected String onRegionTransition(final ServerName serverName,
4150       final RegionStateTransition transition) {
4151     TransitionCode code = transition.getTransitionCode();
4152     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4153     RegionState current = regionStates.getRegionState(hri);
4154     if (LOG.isDebugEnabled()) {
4155       LOG.debug("Got transition " + code + " for "
4156         + (current != null ? current.toString() : hri.getShortNameToLog())
4157         + " from " + serverName);
4158     }
4159     String errorMsg = null;
4160     switch (code) {
4161     case OPENED:
4162       if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4163         LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4164             + serverName);
4165         break;
4166       }
4167     case FAILED_OPEN:
4168       if (current == null
4169           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4170         errorMsg = hri.getShortNameToLog()
4171           + " is not pending open on " + serverName;
4172       } else if (code == TransitionCode.FAILED_OPEN) {
4173         onRegionFailedOpen(hri, serverName);
4174       } else {
4175         long openSeqNum = HConstants.NO_SEQNUM;
4176         if (transition.hasOpenSeqNum()) {
4177           openSeqNum = transition.getOpenSeqNum();
4178         }
4179         if (openSeqNum < 0) {
4180           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4181         } else {
4182           onRegionOpen(hri, serverName, openSeqNum);
4183         }
4184       }
4185       break;
4186 
4187     case CLOSED:
4188       if (current == null
4189           || !current.isPendingCloseOrClosingOnServer(serverName)) {
4190         errorMsg = hri.getShortNameToLog()
4191           + " is not pending close on " + serverName;
4192       } else {
4193         onRegionClosed(hri);
4194       }
4195       break;
4196 
4197     case READY_TO_SPLIT:
4198     case SPLIT_PONR:
4199     case SPLIT:
4200     case SPLIT_REVERTED:
4201       errorMsg = onRegionSplit(serverName, code, hri,
4202         HRegionInfo.convert(transition.getRegionInfo(1)),
4203         HRegionInfo.convert(transition.getRegionInfo(2)));
4204       break;
4205 
4206     case READY_TO_MERGE:
4207     case MERGE_PONR:
4208     case MERGED:
4209     case MERGE_REVERTED:
4210       errorMsg = onRegionMerge(serverName, code, hri,
4211         HRegionInfo.convert(transition.getRegionInfo(1)),
4212         HRegionInfo.convert(transition.getRegionInfo(2)));
4213       break;
4214 
4215     default:
4216       errorMsg = "Unexpected transition code " + code;
4217     }
4218     if (errorMsg != null) {
4219       LOG.error("Failed to transtion region from " + current + " to "
4220         + code + " by " + serverName + ": " + errorMsg);
4221     }
4222     return errorMsg;
4223   }
4224 
4225   /**
4226    * @return Instance of load balancer
4227    */
4228   public LoadBalancer getBalancer() {
4229     return this.balancer;
4230   }
4231 }