View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.io.PrintWriter;
24  import java.lang.reflect.Constructor;
25  import java.lang.reflect.InvocationTargetException;
26  import java.net.InetAddress;
27  import java.net.InetSocketAddress;
28  import java.net.UnknownHostException;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.Comparator;
34  import java.util.HashSet;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.Callable;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.Executors;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicReference;
44  
45  import javax.management.ObjectName;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.apache.hadoop.hbase.classification.InterfaceAudience;
50  import org.apache.hadoop.conf.Configuration;
51  import org.apache.hadoop.fs.Path;
52  import org.apache.hadoop.hbase.Abortable;
53  import org.apache.hadoop.hbase.Chore;
54  import org.apache.hadoop.hbase.ClusterId;
55  import org.apache.hadoop.hbase.ClusterStatus;
56  import org.apache.hadoop.hbase.DoNotRetryIOException;
57  import org.apache.hadoop.hbase.HBaseIOException;
58  import org.apache.hadoop.hbase.HColumnDescriptor;
59  import org.apache.hadoop.hbase.HConstants;
60  import org.apache.hadoop.hbase.HRegionInfo;
61  import org.apache.hadoop.hbase.HTableDescriptor;
62  import org.apache.hadoop.hbase.HealthCheckChore;
63  import org.apache.hadoop.hbase.MasterNotRunningException;
64  import org.apache.hadoop.hbase.NamespaceDescriptor;
65  import org.apache.hadoop.hbase.NamespaceNotFoundException;
66  import org.apache.hadoop.hbase.PleaseHoldException;
67  import org.apache.hadoop.hbase.Server;
68  import org.apache.hadoop.hbase.ServerLoad;
69  import org.apache.hadoop.hbase.ServerName;
70  import org.apache.hadoop.hbase.TableDescriptors;
71  import org.apache.hadoop.hbase.TableName;
72  import org.apache.hadoop.hbase.TableNotDisabledException;
73  import org.apache.hadoop.hbase.TableNotFoundException;
74  import org.apache.hadoop.hbase.UnknownRegionException;
75  import org.apache.hadoop.hbase.catalog.CatalogTracker;
76  import org.apache.hadoop.hbase.catalog.MetaReader;
77  import org.apache.hadoop.hbase.client.HConnectionManager;
78  import org.apache.hadoop.hbase.client.MetaScanner;
79  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
80  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
81  import org.apache.hadoop.hbase.client.Result;
82  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
83  import org.apache.hadoop.hbase.exceptions.DeserializationException;
84  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
85  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
86  import org.apache.hadoop.hbase.executor.ExecutorService;
87  import org.apache.hadoop.hbase.executor.ExecutorType;
88  import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
89  import org.apache.hadoop.hbase.ipc.RpcServer;
90  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
91  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
92  import org.apache.hadoop.hbase.ipc.ServerRpcController;
93  import org.apache.hadoop.hbase.master.RegionState.State;
94  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
95  import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
96  import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
97  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
98  import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
99  import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
100 import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
101 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
102 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
103 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
104 import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
105 import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
106 import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
107 import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
108 import org.apache.hadoop.hbase.master.handler.TruncateTableHandler;
109 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
110 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
111 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
112 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
113 import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
114 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
115 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
116 import org.apache.hadoop.hbase.protobuf.RequestConverter;
117 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
118 import org.apache.hadoop.hbase.protobuf.generated.*;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
122 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
123 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
124 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
182 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
183 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
184 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
185 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
186 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
187 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
188 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
189 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
190 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
191 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
192 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
193 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
194 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
195 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
196 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
197 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
198 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
199 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
200 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
201 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
202 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
203 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
204 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
205 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
206 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
207 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
208 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
209 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
210 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
211 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
212 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
213 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
214 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
215 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
216 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
217 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
218 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
219 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
220 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
221 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
222 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
223 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
224 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
225 import org.apache.hadoop.hbase.replication.regionserver.Replication;
226 import org.apache.hadoop.hbase.security.UserProvider;
227 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
228 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
229 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
230 import org.apache.hadoop.hbase.util.Bytes;
231 import org.apache.hadoop.hbase.util.CompressionTest;
232 import org.apache.hadoop.hbase.util.ConfigUtil;
233 import org.apache.hadoop.hbase.util.FSTableDescriptors;
234 import org.apache.hadoop.hbase.util.EncryptionTest;
235 import org.apache.hadoop.hbase.util.FSUtils;
236 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
237 import org.apache.hadoop.hbase.util.HasThread;
238 import org.apache.hadoop.hbase.util.InfoServer;
239 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
240 import org.apache.hadoop.hbase.util.Pair;
241 import org.apache.hadoop.hbase.util.Sleeper;
242 import org.apache.hadoop.hbase.util.Strings;
243 import org.apache.hadoop.hbase.util.Threads;
244 import org.apache.hadoop.hbase.util.VersionInfo;
245 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
246 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
247 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
248 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
249 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
250 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
251 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
252 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
253 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
254 import org.apache.hadoop.metrics.util.MBeanUtil;
255 import org.apache.hadoop.net.DNS;
256 import org.apache.zookeeper.KeeperException;
257 import org.apache.zookeeper.Watcher;
258 
259 import com.google.common.collect.Lists;
260 import com.google.common.collect.Maps;
261 import com.google.protobuf.Descriptors;
262 import com.google.protobuf.Message;
263 import com.google.protobuf.RpcCallback;
264 import com.google.protobuf.RpcController;
265 import com.google.protobuf.Service;
266 import com.google.protobuf.ServiceException;
267 
268 /**
269  * HMaster is the "master server" for HBase. An HBase cluster has one active
270  * master.  If many masters are started, all compete.  Whichever wins goes on to
271  * run the cluster.  All others park themselves in their constructor until
272  * master or cluster shutdown or until the active master loses its lease in
273  * zookeeper.  Thereafter, all running master jostle to take over master role.
274  *
275  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
276  * this case it will tell all regionservers to go down and then wait on them
277  * all reporting in that they are down.  This master will then shut itself down.
278  *
279  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
280  *
281  * @see Watcher
282  */
283 @InterfaceAudience.Private
284 @SuppressWarnings("deprecation")
285 public class HMaster extends HasThread implements MasterProtos.MasterService.BlockingInterface,
286 RegionServerStatusProtos.RegionServerStatusService.BlockingInterface,
287 MasterServices, Server {
288   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
289 
290   /**
291    * Protection against zombie master. Started once Master accepts active responsibility and
292    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
293    */
294   private static class InitializationMonitor extends HasThread {
295     /** The amount of time in milliseconds to sleep before checking initialization status. */
296     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
297     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
298 
299     /**
300      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
301      * true, do nothing otherwise.
302      */
303     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
304     public static final boolean HALT_DEFAULT = false;
305 
306     private final HMaster master;
307     private final long timeout;
308     private final boolean haltOnTimeout;
309 
310     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
311     InitializationMonitor(HMaster master) {
312       super("MasterInitializationMonitor");
313       this.master = master;
314       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
315       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
316       this.setDaemon(true);
317     }
318 
319     @Override
320     public void run() {
321       try {
322         while (!master.isStopped() && master.isActiveMaster()) {
323           Thread.sleep(timeout);
324           if (master.isInitialized()) {
325             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
326           } else {
327             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
328                 + " consider submitting a bug report including a thread dump of this process.");
329             if (haltOnTimeout) {
330               LOG.error("Zombie Master exiting. Thread dump to stdout");
331               Threads.printThreadInfo(System.out, "Zombie HMaster");
332               System.exit(-1);
333             }
334           }
335         }
336       } catch (InterruptedException ie) {
337         LOG.trace("InitMonitor thread interrupted. Existing.");
338       }
339     }
340   }
341 
342   // MASTER is name of the webapp and the attribute name used stuffing this
343   //instance into web context.
344   public static final String MASTER = "master";
345 
346   // The configuration for the Master
347   private final Configuration conf;
348   // server for the web ui
349   private InfoServer infoServer;
350 
351   // Our zk client.
352   private ZooKeeperWatcher zooKeeper;
353   // Manager and zk listener for master election
354   private ActiveMasterManager activeMasterManager;
355   // Region server tracker
356   RegionServerTracker regionServerTracker;
357   // Draining region server tracker
358   private DrainingServerTracker drainingServerTracker;
359   // Tracker for load balancer state
360   private LoadBalancerTracker loadBalancerTracker;
361   // master address tracker
362   private MasterAddressTracker masterAddressTracker;
363 
364   // RPC server for the HMaster
365   private final RpcServerInterface rpcServer;
366   private JvmPauseMonitor pauseMonitor;
367   // Set after we've called HBaseServer#openServer and ready to receive RPCs.
368   // Set back to false after we stop rpcServer.  Used by tests.
369   private volatile boolean rpcServerOpen = false;
370 
371   /** Namespace stuff */
372   private TableNamespaceManager tableNamespaceManager;
373 
374   /**
375    * This servers address.
376    */
377   private final InetSocketAddress isa;
378 
379   // Metrics for the HMaster
380   private final MetricsMaster metricsMaster;
381   // file system manager for the master FS operations
382   private MasterFileSystem fileSystemManager;
383 
384   // server manager to deal with region server info
385   ServerManager serverManager;
386 
387   // manager of assignment nodes in zookeeper
388   AssignmentManager assignmentManager;
389   // manager of catalog regions
390   private CatalogTracker catalogTracker;
391   // Cluster status zk tracker and local setter
392   private ClusterStatusTracker clusterStatusTracker;
393 
394   // buffer for "fatal error" notices from region servers
395   // in the cluster. This is only used for assisting
396   // operations/debugging.
397   private MemoryBoundedLogMessageBuffer rsFatals;
398 
399   // This flag is for stopping this Master instance.  Its set when we are
400   // stopping or aborting
401   private volatile boolean stopped = false;
402   // Set on abort -- usually failure of our zk session.
403   private volatile boolean abort = false;
404   // flag set after we become the active master (used for testing)
405   private volatile boolean isActiveMaster = false;
406 
407   // flag set after we complete initialization once active,
408   // it is not private since it's used in unit tests
409   volatile boolean initialized = false;
410 
411   // flag set after we complete assignMeta.
412   private volatile boolean serverShutdownHandlerEnabled = false;
413 
414   // Instance of the hbase executor service.
415   ExecutorService executorService;
416 
417   private LoadBalancer balancer;
418   private Thread balancerChore;
419   private Thread clusterStatusChore;
420   private ClusterStatusPublisher clusterStatusPublisherChore = null;
421 
422   private CatalogJanitor catalogJanitorChore;
423   private LogCleaner logCleaner;
424   private HFileCleaner hfileCleaner;
425 
426   private MasterCoprocessorHost cpHost;
427   private final ServerName serverName;
428 
429   private final boolean preLoadTableDescriptors;
430 
431   private TableDescriptors tableDescriptors;
432 
433   // Table level lock manager for schema changes
434   private TableLockManager tableLockManager;
435 
436   // Time stamps for when a hmaster was started and when it became active
437   private long masterStartTime;
438   private long masterActiveTime;
439 
440   /** time interval for emitting metrics values */
441   private final int msgInterval;
442   /**
443    * MX Bean for MasterInfo
444    */
445   private ObjectName mxBean = null;
446 
447   // should we check the compression codec type at master side, default true, HBASE-6370
448   private final boolean masterCheckCompression;
449 
450   // should we check encryption settings at master side, default true
451   private final boolean masterCheckEncryption;
452 
453   private SpanReceiverHost spanReceiverHost;
454 
455   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
456 
457   // monitor for snapshot of hbase tables
458   private SnapshotManager snapshotManager;
459   // monitor for distributed procedures
460   private MasterProcedureManagerHost mpmHost;
461 
462   /** The health check chore. */
463   private HealthCheckChore healthCheckChore;
464 
465   /** flag used in test cases in order to simulate RS failures during master initialization */
466   private volatile boolean initializationBeforeMetaAssignment = false;
467 
468   /** The following is used in master recovery scenario to re-register listeners */
469   private List<ZooKeeperListener> registeredZKListenersBeforeRecovery;
470 
471   /**
472    * Initializes the HMaster. The steps are as follows:
473    * <p>
474    * <ol>
475    * <li>Initialize HMaster RPC and address
476    * <li>Connect to ZooKeeper.
477    * </ol>
478    * <p>
479    * Remaining steps of initialization occur in {@link #run()} so that they
480    * run in their own thread rather than within the context of the constructor.
481    * @throws InterruptedException
482    */
483   public HMaster(final Configuration conf)
484   throws IOException, KeeperException, InterruptedException {
485     this.conf = new Configuration(conf);
486     // Disable the block cache on the master
487     this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
488     FSUtils.setupShortCircuitRead(conf);
489     // Server to handle client requests.
490     String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
491       conf.get("hbase.master.dns.interface", "default"),
492       conf.get("hbase.master.dns.nameserver", "default")));
493     int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
494     // Test that the hostname is reachable
495     InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
496     if (initialIsa.getAddress() == null) {
497       throw new IllegalArgumentException("Failed resolve of hostname " + initialIsa);
498     }
499     // Verify that the bind address is reachable if set
500     String bindAddress = conf.get("hbase.master.ipc.address");
501     if (bindAddress != null) {
502       initialIsa = new InetSocketAddress(bindAddress, port);
503       if (initialIsa.getAddress() == null) {
504         throw new IllegalArgumentException("Failed resolve of bind address " + initialIsa);
505       }
506     }
507     String name = "master/" + initialIsa.toString();
508     // Set how many times to retry talking to another server over HConnection.
509     HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
510     int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT,
511       conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT));
512     this.rpcServer = new RpcServer(this, name, getServices(),
513       initialIsa, // BindAddress is IP we got for this server.
514       conf,
515       new FifoRpcScheduler(conf, numHandlers));
516     // Set our address.
517     this.isa = this.rpcServer.getListenerAddress();
518     // We don't want to pass isa's hostname here since it could be 0.0.0.0
519     this.serverName = ServerName.valueOf(hostname, this.isa.getPort(), System.currentTimeMillis());
520     this.rsFatals = new MemoryBoundedLogMessageBuffer(
521       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
522 
523     // login the zookeeper client principal (if using security)
524     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
525       "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
526 
527     // initialize server principal (if using secure Hadoop)
528     UserProvider provider = UserProvider.instantiate(conf);
529     provider.login("hbase.master.keytab.file",
530       "hbase.master.kerberos.principal", this.isa.getHostName());
531 
532     LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
533         ", hbase.cluster.distributed=" + this.conf.getBoolean("hbase.cluster.distributed", false));
534 
535     // set the thread name now we have an address
536     setName(MASTER + ":" + this.serverName.toShortString());
537 
538     Replication.decorateMasterConfiguration(this.conf);
539 
540     // Hack! Maps DFSClient => Master for logs.  HDFS made this
541     // config param for task trackers, but we can piggyback off of it.
542     if (this.conf.get("mapred.task.id") == null) {
543       this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
544     }
545 
546     this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
547     this.rpcServer.startThreads();
548     this.pauseMonitor = new JvmPauseMonitor(conf);
549     this.pauseMonitor.start();
550 
551     // metrics interval: using the same property as region server.
552     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
553 
554     // should we check the compression codec type at master side, default true, HBASE-6370
555     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
556 
557     // should we check encryption settings at master side, default true
558     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
559 
560     this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));
561 
562     // preload table descriptor at startup
563     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
564 
565     // Health checker thread.
566     int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
567       HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
568     if (isHealthCheckerConfigured()) {
569       healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
570     }
571 
572     // Do we publish the status?
573     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
574         HConstants.STATUS_PUBLISHED_DEFAULT);
575     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
576         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
577             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
578             ClusterStatusPublisher.Publisher.class);
579 
580     if (shouldPublish) {
581       if (publisherClass == null) {
582         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
583             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
584             " is not set - not publishing status");
585       } else {
586         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
587         Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
588       }
589     }
590   }
591 
592   /**
593    * @return list of blocking services and their security info classes that this server supports
594    */
595   private List<BlockingServiceAndInterface> getServices() {
596     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(3);
597     bssi.add(new BlockingServiceAndInterface(
598         MasterProtos.MasterService.newReflectiveBlockingService(this),
599         MasterProtos.MasterService.BlockingInterface.class));
600     bssi.add(new BlockingServiceAndInterface(
601         RegionServerStatusProtos.RegionServerStatusService.newReflectiveBlockingService(this),
602         RegionServerStatusProtos.RegionServerStatusService.BlockingInterface.class));
603     return bssi;
604   }
605 
606   /**
607    * Stall startup if we are designated a backup master; i.e. we want someone
608    * else to become the master before proceeding.
609    * @param c configuration
610    * @param amm
611    * @throws InterruptedException
612    */
613   private static void stallIfBackupMaster(final Configuration c,
614       final ActiveMasterManager amm)
615   throws InterruptedException {
616     // If we're a backup master, stall until a primary to writes his address
617     if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
618       HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
619       return;
620     }
621     LOG.debug("HMaster started in backup mode.  " +
622       "Stalling until master znode is written.");
623     // This will only be a minute or so while the cluster starts up,
624     // so don't worry about setting watches on the parent znode
625     while (!amm.isActiveMaster()) {
626       LOG.debug("Waiting for master address ZNode to be written " +
627         "(Also watching cluster state node)");
628       Thread.sleep(
629         c.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT));
630     }
631 
632   }
633 
634   MetricsMaster getMetrics() {
635     return metricsMaster;
636   }
637 
638   /**
639    * Main processing loop for the HMaster.
640    * <ol>
641    * <li>Block until becoming active master
642    * <li>Finish initialization via finishInitialization(MonitoredTask)
643    * <li>Enter loop until we are stopped
644    * <li>Stop services and perform cleanup once stopped
645    * </ol>
646    */
647   @Override
648   public void run() {
649     MonitoredTask startupStatus =
650       TaskMonitor.get().createStatus("Master startup");
651     startupStatus.setDescription("Master startup");
652     masterStartTime = System.currentTimeMillis();
653     try {
654       this.masterAddressTracker = new MasterAddressTracker(getZooKeeperWatcher(), this);
655       this.masterAddressTracker.start();
656 
657       // Put up info server.
658       int port = this.conf.getInt("hbase.master.info.port", 60010);
659       if (port >= 0) {
660         String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
661         this.infoServer = new InfoServer(MASTER, a, port, false, this.conf);
662         this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class);
663         this.infoServer.addServlet("dump", "/dump", MasterDumpServlet.class);
664         this.infoServer.setAttribute(MASTER, this);
665         this.infoServer.start();
666       }
667 
668       this.registeredZKListenersBeforeRecovery = this.zooKeeper.getListeners();
669       /*
670        * Block on becoming the active master.
671        *
672        * We race with other masters to write our address into ZooKeeper.  If we
673        * succeed, we are the primary/active master and finish initialization.
674        *
675        * If we do not succeed, there is another active master and we should
676        * now wait until it dies to try and become the next active master.  If we
677        * do not succeed on our first attempt, this is no longer a cluster startup.
678        */
679       becomeActiveMaster(startupStatus);
680 
681       // We are either the active master or we were asked to shutdown
682       if (!this.stopped) {
683         finishInitialization(startupStatus, false);
684         loop();
685       }
686     } catch (Throwable t) {
687       // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
688       if (t instanceof NoClassDefFoundError &&
689           t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) {
690           // improved error message for this special case
691           abort("HBase is having a problem with its Hadoop jars.  You may need to "
692               + "recompile HBase against Hadoop version "
693               +  org.apache.hadoop.util.VersionInfo.getVersion()
694               + " or change your hadoop jars to start properly", t);
695       } else {
696         abort("Unhandled exception. Starting shutdown.", t);
697       }
698     } finally {
699       startupStatus.cleanup();
700 
701       stopChores();
702       // Wait for all the remaining region servers to report in IFF we were
703       // running a cluster shutdown AND we were NOT aborting.
704       if (!this.abort && this.serverManager != null &&
705           this.serverManager.isClusterShutdown()) {
706         this.serverManager.letRegionServersShutdown();
707       }
708       stopServiceThreads();
709       // Stop services started for both backup and active masters
710       if (this.activeMasterManager != null) this.activeMasterManager.stop();
711       if (this.catalogTracker != null) this.catalogTracker.stop();
712       if (this.serverManager != null) this.serverManager.stop();
713       if (this.assignmentManager != null) this.assignmentManager.stop();
714       if (this.fileSystemManager != null) this.fileSystemManager.stop();
715       if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
716       this.zooKeeper.close();
717     }
718     LOG.info("HMaster main thread exiting");
719   }
720 
721   /**
722    * Useful for testing purpose also where we have
723    * master restart scenarios.
724    */
725   protected void startCatalogJanitorChore() {
726     Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
727   }
728 
729   /**
730    * Try becoming active master.
731    * @param startupStatus
732    * @return True if we could successfully become the active master.
733    * @throws InterruptedException
734    */
735   private boolean becomeActiveMaster(MonitoredTask startupStatus)
736   throws InterruptedException {
737     // TODO: This is wrong!!!! Should have new servername if we restart ourselves,
738     // if we come back to life.
739     this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
740         this);
741     this.zooKeeper.registerListener(activeMasterManager);
742     stallIfBackupMaster(this.conf, this.activeMasterManager);
743 
744     // The ClusterStatusTracker is setup before the other
745     // ZKBasedSystemTrackers because it's needed by the activeMasterManager
746     // to check if the cluster should be shutdown.
747     this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
748     this.clusterStatusTracker.start();
749     return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus);
750   }
751 
752   /**
753    * Initialize all ZK based system trackers.
754    * @throws IOException
755    * @throws InterruptedException
756    */
757   void initializeZKBasedSystemTrackers() throws IOException,
758       InterruptedException, KeeperException {
759     this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this);
760     this.catalogTracker.start();
761 
762     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
763     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
764     this.loadBalancerTracker.start();
765     this.assignmentManager = new AssignmentManager(this, serverManager,
766       this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
767       this.tableLockManager);
768     zooKeeper.registerListenerFirst(assignmentManager);
769 
770     this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
771         this.serverManager);
772     this.regionServerTracker.start();
773 
774     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
775       this.serverManager);
776     this.drainingServerTracker.start();
777 
778     // Set the cluster as up.  If new RSs, they'll be waiting on this before
779     // going ahead with their startup.
780     boolean wasUp = this.clusterStatusTracker.isClusterUp();
781     if (!wasUp) this.clusterStatusTracker.setClusterUp();
782 
783     LOG.info("Server active/primary master=" + this.serverName +
784         ", sessionid=0x" +
785         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
786         ", setting cluster-up flag (Was=" + wasUp + ")");
787 
788     // create/initialize the snapshot manager and other procedure managers
789     this.snapshotManager = new SnapshotManager();
790     this.mpmHost = new MasterProcedureManagerHost();
791     this.mpmHost.register(this.snapshotManager);
792     this.mpmHost.loadProcedures(conf);
793     this.mpmHost.initialize(this, this.metricsMaster);
794   }
795 
796   /**
797    * Create CatalogTracker.
798    * In its own method so can intercept and mock it over in tests.
799    * @param zk If zk is null, we'll create an instance (and shut it down
800    * when {@link #stop(String)} is called) else we'll use what is passed.
801    * @param conf
802    * @param abortable If fatal exception we'll call abort on this.  May be null.
803    * If it is we'll use the Connection associated with the passed
804    * {@link Configuration} as our {@link Abortable}.
805    * ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
806    * @throws IOException
807    */
808   CatalogTracker createCatalogTracker(final ZooKeeperWatcher zk,
809       final Configuration conf, Abortable abortable)
810   throws IOException {
811     return new CatalogTracker(zk, conf, abortable);
812   }
813 
814   // Check if we should stop every 100ms
815   private Sleeper stopSleeper = new Sleeper(100, this);
816 
817   private void loop() {
818     long lastMsgTs = 0l;
819     long now = 0l;
820     while (!this.stopped) {
821       now = System.currentTimeMillis();
822       if ((now - lastMsgTs) >= this.msgInterval) {
823         doMetrics();
824         lastMsgTs = System.currentTimeMillis();
825       }
826       stopSleeper.sleep();
827     }
828   }
829 
830   /**
831    * Emit the HMaster metrics, such as region in transition metrics.
832    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
833    */
834   private void doMetrics() {
835     try {
836       this.assignmentManager.updateRegionsInTransitionMetrics();
837     } catch (Throwable e) {
838       LOG.error("Couldn't update metrics: " + e.getMessage());
839     }
840   }
841 
842   /**
843    * Finish initialization of HMaster after becoming the primary master.
844    *
845    * <ol>
846    * <li>Initialize master components - file system manager, server manager,
847    *     assignment manager, region server tracker, catalog tracker, etc</li>
848    * <li>Start necessary service threads - rpc server, info server,
849    *     executor services, etc</li>
850    * <li>Set cluster as UP in ZooKeeper</li>
851    * <li>Wait for RegionServers to check-in</li>
852    * <li>Split logs and perform data recovery, if necessary</li>
853    * <li>Ensure assignment of meta regions<li>
854    * <li>Handle either fresh cluster start or master failover</li>
855    * </ol>
856    *
857    * @param masterRecovery
858    *
859    * @throws IOException
860    * @throws InterruptedException
861    * @throws KeeperException
862    */
863   private void finishInitialization(MonitoredTask status, boolean masterRecovery)
864   throws IOException, InterruptedException, KeeperException {
865 
866     isActiveMaster = true;
867     Thread zombieDetector = new Thread(new InitializationMonitor(this));
868     zombieDetector.start();
869 
870     /*
871      * We are active master now... go initialize components we need to run.
872      * Note, there may be dross in zk from previous runs; it'll get addressed
873      * below after we determine if cluster startup or failover.
874      */
875 
876     status.setStatus("Initializing Master file system");
877 
878     this.masterActiveTime = System.currentTimeMillis();
879     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
880     this.fileSystemManager = new MasterFileSystem(this, this, masterRecovery);
881 
882     this.tableDescriptors =
883       new FSTableDescriptors(this.conf, this.fileSystemManager.getFileSystem(),
884       this.fileSystemManager.getRootDir());
885 
886     // enable table descriptors cache
887     this.tableDescriptors.setCacheOn();
888 
889     // warm-up HTDs cache on master initialization
890     if (preLoadTableDescriptors) {
891       status.setStatus("Pre-loading table descriptors");
892       this.tableDescriptors.getAll();
893     }
894 
895     // publish cluster ID
896     status.setStatus("Publishing Cluster ID in ZooKeeper");
897     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
898 
899     if (!masterRecovery) {
900       this.executorService = new ExecutorService(getServerName().toShortString());
901       this.serverManager = createServerManager(this, this);
902     }
903 
904     //Initialize table lock manager, and ensure that all write locks held previously
905     //are invalidated
906     this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName);
907     if (!masterRecovery) {
908       this.tableLockManager.reapWriteLocks();
909     }
910 
911     status.setStatus("Initializing ZK system trackers");
912     initializeZKBasedSystemTrackers();
913 
914     if (!masterRecovery) {
915       // initialize master side coprocessors before we start handling requests
916       status.setStatus("Initializing master coprocessors");
917       this.cpHost = new MasterCoprocessorHost(this, this.conf);
918 
919       spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
920 
921       // start up all service threads.
922       status.setStatus("Initializing master service threads");
923       startServiceThreads();
924     }
925 
926     // Wait for region servers to report in.
927     this.serverManager.waitForRegionServers(status);
928     // Check zk for region servers that are up but didn't register
929     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
930       // The isServerOnline check is opportunistic, correctness is handled inside
931       if (!this.serverManager.isServerOnline(sn)
932           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
933         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
934       }
935     }
936 
937     if (!masterRecovery) {
938       this.assignmentManager.startTimeOutMonitor();
939     }
940 
941     // get a list for previously failed RS which need log splitting work
942     // we recover hbase:meta region servers inside master initialization and
943     // handle other failed servers in SSH in order to start up master node ASAP
944     Set<ServerName> previouslyFailedServers = this.fileSystemManager
945         .getFailedServersFromLogFolders();
946 
947     // remove stale recovering regions from previous run
948     this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
949 
950     // log splitting for hbase:meta server
951     ServerName oldMetaServerLocation = this.catalogTracker.getMetaLocation();
952     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
953       splitMetaLogBeforeAssignment(oldMetaServerLocation);
954       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
955       // may also host user regions
956     }
957     Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
958     // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
959     // instead of previouslyFailedMetaRSs alone to address the following two situations:
960     // 1) the chained failure situation(recovery failed multiple times in a row).
961     // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
962     // same server still has non-meta wals to be replayed so that
963     // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
964     // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
965     // there is no op for the server.
966     previouslyFailedMetaRSs.addAll(previouslyFailedServers);
967 
968     this.initializationBeforeMetaAssignment = true;
969 
970     //initialize load balancer
971     this.balancer.setClusterStatus(getClusterStatus());
972     this.balancer.setMasterServices(this);
973     this.balancer.initialize();
974 
975     // Make sure meta assigned before proceeding.
976     status.setStatus("Assigning Meta Region");
977     assignMeta(status, previouslyFailedMetaRSs);
978     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
979     // assigned when master is shutting down
980     if(this.stopped) return;
981 
982     status.setStatus("Submitting log splitting work for previously failed region servers");
983     // Master has recovered hbase:meta region server and we put
984     // other failed region servers in a queue to be handled later by SSH
985     for (ServerName tmpServer : previouslyFailedServers) {
986       this.serverManager.processDeadServer(tmpServer, true);
987     }
988 
989     // Update meta with new PB serialization if required. i.e migrate all HRI to PB serialization
990     // in meta. This must happen before we assign all user regions or else the assignment will
991     // fail.
992     if (this.conf.getBoolean("hbase.MetaMigrationConvertingToPB", true)) {
993       org.apache.hadoop.hbase.catalog.MetaMigrationConvertingToPB.updateMetaIfNecessary(this);
994     }
995 
996     // Fix up assignment manager status
997     status.setStatus("Starting assignment manager");
998     this.assignmentManager.joinCluster();
999 
1000     //set cluster status again after user regions are assigned
1001     this.balancer.setClusterStatus(getClusterStatus());
1002 
1003     if (!masterRecovery) {
1004       // Start balancer and meta catalog janitor after meta and regions have
1005       // been assigned.
1006       status.setStatus("Starting balancer and catalog janitor");
1007       this.clusterStatusChore = getAndStartClusterStatusChore(this);
1008       this.balancerChore = getAndStartBalancerChore(this);
1009       this.catalogJanitorChore = new CatalogJanitor(this, this);
1010       startCatalogJanitorChore();
1011     }
1012 
1013     status.setStatus("Starting namespace manager");
1014     initNamespace();
1015 
1016     if (this.cpHost != null) {
1017       try {
1018         this.cpHost.preMasterInitialization();
1019       } catch (IOException e) {
1020         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
1021       }
1022     }
1023 
1024     status.markComplete("Initialization successful");
1025     LOG.info("Master has completed initialization");
1026     initialized = true;
1027     // clear the dead servers with same host name and port of online server because we are not
1028     // removing dead server with same hostname and port of rs which is trying to check in before
1029     // master initialization. See HBASE-5916.
1030     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
1031 
1032     if (!masterRecovery) {
1033       if (this.cpHost != null) {
1034         // don't let cp initialization errors kill the master
1035         try {
1036           this.cpHost.postStartMaster();
1037         } catch (IOException ioe) {
1038           LOG.error("Coprocessor postStartMaster() hook failed", ioe);
1039         }
1040       }
1041     }
1042 
1043     zombieDetector.interrupt();
1044   }
1045 
1046   /**
1047    * Create a {@link ServerManager} instance.
1048    * @param master
1049    * @param services
1050    * @return An instance of {@link ServerManager}
1051    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
1052    * @throws IOException
1053    */
1054   ServerManager createServerManager(final Server master,
1055       final MasterServices services)
1056   throws IOException {
1057     // We put this out here in a method so can do a Mockito.spy and stub it out
1058     // w/ a mocked up ServerManager.
1059     return new ServerManager(master, services);
1060   }
1061 
1062   /**
1063    * Check <code>hbase:meta</code> is assigned. If not, assign it.
1064    * @param status MonitoredTask
1065    * @param previouslyFailedMetaRSs
1066    * @throws InterruptedException
1067    * @throws IOException
1068    * @throws KeeperException
1069    */
1070   void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs)
1071       throws InterruptedException, IOException, KeeperException {
1072     // Work on meta region
1073     int assigned = 0;
1074     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
1075     status.setStatus("Assigning hbase:meta region");
1076 
1077     RegionStates regionStates = assignmentManager.getRegionStates();
1078     
1079     RegionState regionState = this.catalogTracker.getMetaRegionState();
1080     ServerName currentMetaServer = regionState.getServerName();
1081     
1082     if (!ConfigUtil.useZKForAssignment(conf)) {
1083       regionStates.createRegionState(HRegionInfo.FIRST_META_REGIONINFO, regionState.getState(),
1084         currentMetaServer);
1085     } else {
1086       regionStates.createRegionState(HRegionInfo.FIRST_META_REGIONINFO);
1087     }
1088     boolean rit =
1089      this.assignmentManager
1090          .processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
1091     boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout);
1092     if (!metaRegionLocation || !regionState.isOpened()) {
1093       // Meta location is not verified. It should be in transition, or offline.
1094       // We will wait for it to be assigned in enableSSHandWaitForMeta below.
1095       assigned++;
1096       if (!ConfigUtil.useZKForAssignment(conf)) {
1097         assignMetaZkLess(regionStates, regionState, timeout, previouslyFailedMetaRSs);
1098       } else if (!rit) {
1099         // Assign meta since not already in transition
1100         if (currentMetaServer != null) {
1101           // If the meta server is not known to be dead or online,
1102           // just split the meta log, and don't expire it since this
1103           // could be a full cluster restart. Otherwise, we will think
1104           // this is a failover and lose previous region locations.
1105           // If it is really a failover case, AM will find out in rebuilding
1106           // user regions. Otherwise, we are good since all logs are split
1107           // or known to be replayed before user regions are assigned.
1108           if (serverManager.isServerOnline(currentMetaServer)) {
1109             LOG.info("Forcing expire of " + currentMetaServer);
1110             serverManager.expireServer(currentMetaServer);
1111           }
1112           splitMetaLogBeforeAssignment(currentMetaServer);
1113           previouslyFailedMetaRSs.add(currentMetaServer);
1114         }
1115         assignmentManager.assignMeta();
1116       }
1117     } else {
1118       // Region already assigned. We didn't assign it. Add to in-memory state.
1119       regionStates.updateRegionState(
1120         HRegionInfo.FIRST_META_REGIONINFO, State.OPEN, currentMetaServer);
1121       this.assignmentManager.regionOnline(
1122         HRegionInfo.FIRST_META_REGIONINFO, currentMetaServer);
1123     }
1124 
1125     enableMeta(TableName.META_TABLE_NAME);
1126 
1127     if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
1128         && (!previouslyFailedMetaRSs.isEmpty())) {
1129       // replay WAL edits mode need new hbase:meta RS is assigned firstly
1130       status.setStatus("replaying log for Meta Region");
1131       this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
1132     }
1133 
1134     // Make sure a hbase:meta location is set. We need to enable SSH here since
1135     // if the meta region server is died at this time, we need it to be re-assigned
1136     // by SSH so that system tables can be assigned.
1137     // No need to wait for meta is assigned = 0 when meta is just verified.
1138     enableServerShutdownHandler(assigned != 0);
1139 
1140     LOG.info("hbase:meta assigned=" + assigned + ", rit=" + rit +
1141       ", location=" + catalogTracker.getMetaLocation());
1142     status.setStatus("META assigned.");
1143   }
1144 
1145   private void assignMetaZkLess(RegionStates regionStates, RegionState regionState, long timeout,
1146       Set<ServerName> previouslyFailedRs) throws IOException, KeeperException {
1147     ServerName currentServer = regionState.getServerName();
1148     if (serverManager.isServerOnline(currentServer)) {
1149       LOG.info("Meta was in transition on " + currentServer);
1150       assignmentManager.processRegionInTransitionZkLess();
1151     } else {
1152       if (currentServer != null) {
1153         splitMetaLogBeforeAssignment(currentServer);
1154         regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
1155         previouslyFailedRs.add(currentServer);
1156       }
1157       LOG.info("Re-assigning hbase:meta, it was on " + currentServer);
1158       regionStates.updateRegionState(HRegionInfo.FIRST_META_REGIONINFO, State.OFFLINE);
1159       assignmentManager.assignMeta();
1160     }
1161   }
1162   
1163   void initNamespace() throws IOException {
1164     //create namespace manager
1165     tableNamespaceManager = new TableNamespaceManager(this);
1166     tableNamespaceManager.start();
1167   }
1168 
1169   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
1170     if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
1171       // In log replay mode, we mark hbase:meta region as recovering in ZK
1172       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
1173       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
1174       this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
1175     } else {
1176       // In recovered.edits mode: create recovered edits file for hbase:meta server
1177       this.fileSystemManager.splitMetaLog(currentMetaServer);
1178     }
1179   }
1180 
1181   private void enableServerShutdownHandler(
1182       final boolean waitForMeta) throws IOException, InterruptedException {
1183     // If ServerShutdownHandler is disabled, we enable it and expire those dead
1184     // but not expired servers. This is required so that if meta is assigning to
1185     // a server which dies after assignMeta starts assignment,
1186     // SSH can re-assign it. Otherwise, we will be
1187     // stuck here waiting forever if waitForMeta is specified.
1188     if (!serverShutdownHandlerEnabled) {
1189       serverShutdownHandlerEnabled = true;
1190       this.serverManager.processQueuedDeadServers();
1191     }
1192 
1193     if (waitForMeta) {
1194       this.catalogTracker.waitForMeta();
1195       // Above check waits for general meta availability but this does not
1196       // guarantee that the transition has completed
1197       this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
1198     }
1199   }
1200 
1201   private void enableMeta(TableName metaTableName) {
1202     if (!this.assignmentManager.getZKTable().isEnabledTable(metaTableName)) {
1203       this.assignmentManager.setEnabledTable(metaTableName);
1204     }
1205   }
1206 
1207   /**
1208    * This function returns a set of region server names under hbase:meta recovering region ZK node
1209    * @return Set of meta server names which were recorded in ZK
1210    * @throws KeeperException
1211    */
1212   private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1213     Set<ServerName> result = new HashSet<ServerName>();
1214     String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1215       HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1216     List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1217     if (regionFailedServers == null) return result;
1218 
1219     for(String failedServer : regionFailedServers) {
1220       ServerName server = ServerName.parseServerName(failedServer);
1221       result.add(server);
1222     }
1223     return result;
1224   }
1225 
1226   @Override
1227   public TableDescriptors getTableDescriptors() {
1228     return this.tableDescriptors;
1229   }
1230 
1231   /** @return InfoServer object. Maybe null.*/
1232   public InfoServer getInfoServer() {
1233     return this.infoServer;
1234   }
1235 
1236   @Override
1237   public Configuration getConfiguration() {
1238     return this.conf;
1239   }
1240 
1241   @Override
1242   public ServerManager getServerManager() {
1243     return this.serverManager;
1244   }
1245 
1246   @Override
1247   public ExecutorService getExecutorService() {
1248     return this.executorService;
1249   }
1250 
1251   @Override
1252   public MasterFileSystem getMasterFileSystem() {
1253     return this.fileSystemManager;
1254   }
1255 
1256   /**
1257    * Get the ZK wrapper object - needed by master_jsp.java
1258    * @return the zookeeper wrapper
1259    */
1260   public ZooKeeperWatcher getZooKeeperWatcher() {
1261     return this.zooKeeper;
1262   }
1263 
1264   public ActiveMasterManager getActiveMasterManager() {
1265     return this.activeMasterManager;
1266   }
1267 
1268   public MasterAddressTracker getMasterAddressTracker() {
1269     return this.masterAddressTracker;
1270   }
1271 
1272   /*
1273    * Start up all services. If any of these threads gets an unhandled exception
1274    * then they just die with a logged message.  This should be fine because
1275    * in general, we do not expect the master to get such unhandled exceptions
1276    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1277    *  need to install an unexpected exception handler.
1278    */
1279   void startServiceThreads() throws IOException{
1280    // Start the executor service pools
1281    this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1282       conf.getInt("hbase.master.executor.openregion.threads", 5));
1283    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1284       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1285    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1286       conf.getInt("hbase.master.executor.serverops.threads", 5));
1287    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1288       conf.getInt("hbase.master.executor.serverops.threads", 5));
1289    this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1290       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1291 
1292    // We depend on there being only one instance of this executor running
1293    // at a time.  To do concurrency, would need fencing of enable/disable of
1294    // tables.
1295    // Any time changing this maxThreads to > 1, pls see the comment at
1296    // AccessController#postCreateTableHandler
1297    this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1298 
1299    // Start log cleaner thread
1300    String n = Thread.currentThread().getName();
1301    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1302    this.logCleaner =
1303       new LogCleaner(cleanerInterval,
1304          this, conf, getMasterFileSystem().getFileSystem(),
1305          getMasterFileSystem().getOldLogDir());
1306          Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
1307 
1308    //start the hfile archive cleaner thread
1309     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1310     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1311         .getFileSystem(), archiveDir);
1312     Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner");
1313 
1314     // Start the health checker
1315     if (this.healthCheckChore != null) {
1316       Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker");
1317     }
1318 
1319     // Start allowing requests to happen.
1320     this.rpcServer.openServer();
1321     this.rpcServerOpen = true;
1322     if (LOG.isTraceEnabled()) {
1323       LOG.trace("Started service threads");
1324     }
1325   }
1326 
1327   /**
1328    * Use this when trying to figure when its ok to send in rpcs.  Used by tests.
1329    * @return True if we have successfully run {@link RpcServer#openServer()}
1330    */
1331   boolean isRpcServerOpen() {
1332     return this.rpcServerOpen;
1333   }
1334 
1335   private void stopServiceThreads() {
1336     if (LOG.isDebugEnabled()) {
1337       LOG.debug("Stopping service threads");
1338     }
1339     if (this.rpcServer != null) this.rpcServer.stop();
1340     this.rpcServerOpen = false;
1341     // Clean up and close up shop
1342     if (this.logCleaner!= null) this.logCleaner.interrupt();
1343     if (this.hfileCleaner != null) this.hfileCleaner.interrupt();
1344 
1345     if (this.infoServer != null) {
1346       LOG.info("Stopping infoServer");
1347       try {
1348         this.infoServer.stop();
1349       } catch (Exception ex) {
1350         ex.printStackTrace();
1351       }
1352     }
1353     if (this.executorService != null) this.executorService.shutdown();
1354     if (this.healthCheckChore != null) {
1355       this.healthCheckChore.interrupt();
1356     }
1357     if (this.pauseMonitor != null) {
1358       this.pauseMonitor.stop();
1359     }
1360   }
1361 
1362   private static Thread getAndStartClusterStatusChore(HMaster master) {
1363     if (master == null || master.balancer == null) {
1364       return null;
1365     }
1366     Chore chore = new ClusterStatusChore(master, master.balancer);
1367     return Threads.setDaemonThreadRunning(chore.getThread());
1368   }
1369 
1370   private static Thread getAndStartBalancerChore(final HMaster master) {
1371     // Start up the load balancer chore
1372     Chore chore = new BalancerChore(master);
1373     return Threads.setDaemonThreadRunning(chore.getThread());
1374   }
1375 
1376   private void stopChores() {
1377     if (this.balancerChore != null) {
1378       this.balancerChore.interrupt();
1379     }
1380     if (this.clusterStatusChore != null) {
1381       this.clusterStatusChore.interrupt();
1382     }
1383     if (this.catalogJanitorChore != null) {
1384       this.catalogJanitorChore.interrupt();
1385     }
1386     if (this.clusterStatusPublisherChore != null){
1387       clusterStatusPublisherChore.interrupt();
1388     }
1389   }
1390 
1391   @Override
1392   public RegionServerStartupResponse regionServerStartup(
1393       RpcController controller, RegionServerStartupRequest request) throws ServiceException {
1394     // Register with server manager
1395     try {
1396       InetAddress ia = getRemoteInetAddress(request.getPort(), request.getServerStartCode());
1397       ServerName rs = this.serverManager.regionServerStartup(ia, request.getPort(),
1398         request.getServerStartCode(), request.getServerCurrentTime());
1399 
1400       // Send back some config info
1401       RegionServerStartupResponse.Builder resp = createConfigurationSubset();
1402       NameStringPair.Builder entry = NameStringPair.newBuilder()
1403         .setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)
1404         .setValue(rs.getHostname());
1405       resp.addMapEntries(entry.build());
1406 
1407       return resp.build();
1408     } catch (IOException ioe) {
1409       throw new ServiceException(ioe);
1410     }
1411   }
1412 
1413   /**
1414    * @return Get remote side's InetAddress
1415    * @throws UnknownHostException
1416    */
1417   InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
1418   throws UnknownHostException {
1419     // Do it out here in its own little method so can fake an address when
1420     // mocking up in tests.
1421     return RpcServer.getRemoteIp();
1422   }
1423 
1424   /**
1425    * @return Subset of configuration to pass initializing regionservers: e.g.
1426    * the filesystem to use and root directory to use.
1427    */
1428   protected RegionServerStartupResponse.Builder createConfigurationSubset() {
1429     RegionServerStartupResponse.Builder resp = addConfig(
1430       RegionServerStartupResponse.newBuilder(), HConstants.HBASE_DIR);
1431     resp = addConfig(resp, "fs.default.name");
1432     return addConfig(resp, "hbase.master.info.port");
1433   }
1434 
1435   private RegionServerStartupResponse.Builder addConfig(
1436       final RegionServerStartupResponse.Builder resp, final String key) {
1437     NameStringPair.Builder entry = NameStringPair.newBuilder()
1438       .setName(key)
1439       .setValue(this.conf.get(key));
1440     resp.addMapEntries(entry.build());
1441     return resp;
1442   }
1443 
1444   @Override
1445   public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(RpcController controller,
1446       GetLastFlushedSequenceIdRequest request) throws ServiceException {
1447     byte[] regionName = request.getRegionName().toByteArray();
1448     long seqId = serverManager.getLastFlushedSequenceId(regionName);
1449     return ResponseConverter.buildGetLastFlushedSequenceIdResponse(seqId);
1450   }
1451 
1452   @Override
1453   public RegionServerReportResponse regionServerReport(
1454       RpcController controller, RegionServerReportRequest request) throws ServiceException {
1455     try {
1456       ClusterStatusProtos.ServerLoad sl = request.getLoad();
1457       ServerName serverName = ProtobufUtil.toServerName(request.getServer());
1458       ServerLoad oldLoad = serverManager.getLoad(serverName);
1459       this.serverManager.regionServerReport(serverName, new ServerLoad(sl));
1460       if (sl != null && this.metricsMaster != null) {
1461         // Up our metrics.
1462         this.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
1463           - (oldLoad != null ? oldLoad.getTotalNumberOfRequests() : 0));
1464       }
1465     } catch (IOException ioe) {
1466       throw new ServiceException(ioe);
1467     }
1468 
1469     return RegionServerReportResponse.newBuilder().build();
1470   }
1471 
1472   @Override
1473   public ReportRSFatalErrorResponse reportRSFatalError(
1474       RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException {
1475     String errorText = request.getErrorMessage();
1476     ServerName sn = ProtobufUtil.toServerName(request.getServer());
1477     String msg = "Region server " + sn +
1478       " reported a fatal error:\n" + errorText;
1479     LOG.error(msg);
1480     rsFatals.add(msg);
1481 
1482     return ReportRSFatalErrorResponse.newBuilder().build();
1483   }
1484 
1485   public boolean isMasterRunning() {
1486     return !isStopped();
1487   }
1488 
1489   @Override
1490   public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
1491   throws ServiceException {
1492     return IsMasterRunningResponse.newBuilder().setIsMasterRunning(isMasterRunning()).build();
1493   }
1494 
1495   @Override
1496   public RunCatalogScanResponse runCatalogScan(RpcController c,
1497       RunCatalogScanRequest req) throws ServiceException {
1498     try {
1499       return ResponseConverter.buildRunCatalogScanResponse(catalogJanitorChore.scan());
1500     } catch (IOException ioe) {
1501       throw new ServiceException(ioe);
1502     }
1503   }
1504 
1505   @Override
1506   public EnableCatalogJanitorResponse enableCatalogJanitor(RpcController c,
1507       EnableCatalogJanitorRequest req) throws ServiceException {
1508     return EnableCatalogJanitorResponse.newBuilder().
1509         setPrevValue(catalogJanitorChore.setEnabled(req.getEnable())).build();
1510   }
1511 
1512   @Override
1513   public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(RpcController c,
1514       IsCatalogJanitorEnabledRequest req) throws ServiceException {
1515     boolean isEnabled = catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false;
1516     return IsCatalogJanitorEnabledResponse.newBuilder().setValue(isEnabled).build();
1517   }
1518 
1519   /**
1520    * @return Maximum time we should run balancer for
1521    */
1522   private int getBalancerCutoffTime() {
1523     int balancerCutoffTime =
1524       getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1525     if (balancerCutoffTime == -1) {
1526       // No time period set so create one
1527       int balancerPeriod =
1528         getConfiguration().getInt("hbase.balancer.period", 300000);
1529       balancerCutoffTime = balancerPeriod;
1530       // If nonsense period, set it to balancerPeriod
1531       if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1532     }
1533     return balancerCutoffTime;
1534   }
1535 
1536   public boolean balance() throws HBaseIOException {
1537     // if master not initialized, don't run balancer.
1538     if (!this.initialized) {
1539       LOG.debug("Master has not been initialized, don't run balancer.");
1540       return false;
1541     }
1542     // Do this call outside of synchronized block.
1543     int maximumBalanceTime = getBalancerCutoffTime();
1544     boolean balancerRan;
1545     synchronized (this.balancer) {
1546       // If balance not true, don't run balancer.
1547       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1548       // Only allow one balance run at at time.
1549       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1550         Map<String, RegionState> regionsInTransition =
1551           this.assignmentManager.getRegionStates().getRegionsInTransition();
1552         LOG.debug("Not running balancer because " + regionsInTransition.size() +
1553           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1554             abbreviate(regionsInTransition.toString(), 256));
1555         return false;
1556       }
1557       if (this.serverManager.areDeadServersInProgress()) {
1558         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1559           this.serverManager.getDeadServers());
1560         return false;
1561       }
1562 
1563       if (this.cpHost != null) {
1564         try {
1565           if (this.cpHost.preBalance()) {
1566             LOG.debug("Coprocessor bypassing balancer request");
1567             return false;
1568           }
1569         } catch (IOException ioe) {
1570           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1571           return false;
1572         }
1573       }
1574 
1575       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1576         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1577 
1578       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1579       //Give the balancer the current cluster state.
1580       this.balancer.setClusterStatus(getClusterStatus());
1581       for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
1582         List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
1583         if (partialPlans != null) plans.addAll(partialPlans);
1584       }
1585       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1586       int rpCount = 0;  // number of RegionPlans balanced so far
1587       long totalRegPlanExecTime = 0;
1588       balancerRan = plans != null;
1589       if (plans != null && !plans.isEmpty()) {
1590         for (RegionPlan plan: plans) {
1591           LOG.info("balance " + plan);
1592           long balStartTime = System.currentTimeMillis();
1593           //TODO: bulk assign
1594           this.assignmentManager.balance(plan);
1595           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1596           rpCount++;
1597           if (rpCount < plans.size() &&
1598               // if performing next balance exceeds cutoff time, exit the loop
1599               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1600             //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
1601             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1602               maximumBalanceTime);
1603             break;
1604           }
1605         }
1606       }
1607       if (this.cpHost != null) {
1608         try {
1609           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1610         } catch (IOException ioe) {
1611           // balancing already succeeded so don't change the result
1612           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1613         }
1614       }
1615     }
1616     return balancerRan;
1617   }
1618 
1619   @Override
1620   public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException {
1621     try {
1622       return BalanceResponse.newBuilder().setBalancerRan(balance()).build();
1623     } catch (HBaseIOException ex) {
1624       throw new ServiceException(ex);
1625     }
1626   }
1627 
1628   enum BalanceSwitchMode {
1629     SYNC,
1630     ASYNC
1631   }
1632 
1633   /**
1634    * Assigns balancer switch according to BalanceSwitchMode
1635    * @param b new balancer switch
1636    * @param mode BalanceSwitchMode
1637    * @return old balancer switch
1638    */
1639   public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) throws IOException {
1640     boolean oldValue = this.loadBalancerTracker.isBalancerOn();
1641     boolean newValue = b;
1642     try {
1643       if (this.cpHost != null) {
1644         newValue = this.cpHost.preBalanceSwitch(newValue);
1645       }
1646       try {
1647         if (mode == BalanceSwitchMode.SYNC) {
1648           synchronized (this.balancer) {
1649             this.loadBalancerTracker.setBalancerOn(newValue);
1650           }
1651         } else {
1652           this.loadBalancerTracker.setBalancerOn(newValue);
1653         }
1654       } catch (KeeperException ke) {
1655         throw new IOException(ke);
1656       }
1657       LOG.info(getClientIdAuditPrefix() + " set balanceSwitch=" + newValue);
1658       if (this.cpHost != null) {
1659         this.cpHost.postBalanceSwitch(oldValue, newValue);
1660       }
1661     } catch (IOException ioe) {
1662       LOG.warn("Error flipping balance switch", ioe);
1663     }
1664     return oldValue;
1665   }
1666 
1667   /**
1668    * @return Client info for use as prefix on an audit log string; who did an action
1669    */
1670   String getClientIdAuditPrefix() {
1671     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1672   }
1673 
1674   public boolean synchronousBalanceSwitch(final boolean b) throws IOException {
1675     return switchBalancer(b, BalanceSwitchMode.SYNC);
1676   }
1677 
1678   public boolean balanceSwitch(final boolean b) throws IOException {
1679     return switchBalancer(b, BalanceSwitchMode.ASYNC);
1680   }
1681 
1682   @Override
1683   public SetBalancerRunningResponse setBalancerRunning(
1684       RpcController controller, SetBalancerRunningRequest req) throws ServiceException {
1685     try {
1686       boolean prevValue = (req.getSynchronous())?
1687         synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn());
1688       return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build();
1689     } catch (IOException ioe) {
1690       throw new ServiceException(ioe);
1691     }
1692   }
1693 
1694   /**
1695    * Switch for the background CatalogJanitor thread.
1696    * Used for testing.  The thread will continue to run.  It will just be a noop
1697    * if disabled.
1698    * @param b If false, the catalog janitor won't do anything.
1699    */
1700   public void setCatalogJanitorEnabled(final boolean b) {
1701     this.catalogJanitorChore.setEnabled(b);
1702   }
1703 
1704   @Override
1705   public DispatchMergingRegionsResponse dispatchMergingRegions(
1706       RpcController controller, DispatchMergingRegionsRequest request)
1707       throws ServiceException {
1708     final byte[] encodedNameOfRegionA = request.getRegionA().getValue()
1709         .toByteArray();
1710     final byte[] encodedNameOfRegionB = request.getRegionB().getValue()
1711         .toByteArray();
1712     final boolean forcible = request.getForcible();
1713     if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME
1714         || request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
1715       LOG.warn("mergeRegions specifier type: expected: "
1716           + RegionSpecifierType.ENCODED_REGION_NAME + " actual: region_a="
1717           + request.getRegionA().getType() + ", region_b="
1718           + request.getRegionB().getType());
1719     }
1720     RegionState regionStateA = assignmentManager.getRegionStates()
1721         .getRegionState(Bytes.toString(encodedNameOfRegionA));
1722     RegionState regionStateB = assignmentManager.getRegionStates()
1723         .getRegionState(Bytes.toString(encodedNameOfRegionB));
1724     if (regionStateA == null || regionStateB == null) {
1725       throw new ServiceException(new UnknownRegionException(
1726           Bytes.toStringBinary(regionStateA == null ? encodedNameOfRegionA
1727               : encodedNameOfRegionB)));
1728     }
1729 
1730     if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
1731       throw new ServiceException(new MergeRegionException(
1732         "Unable to merge regions not online " + regionStateA + ", " + regionStateB));
1733     }
1734 
1735     HRegionInfo regionInfoA = regionStateA.getRegion();
1736     HRegionInfo regionInfoB = regionStateB.getRegion();
1737     if (regionInfoA.compareTo(regionInfoB) == 0) {
1738       throw new ServiceException(new MergeRegionException(
1739         "Unable to merge a region to itself " + regionInfoA + ", " + regionInfoB));
1740     }
1741 
1742     if (!forcible && !HRegionInfo.areAdjacent(regionInfoA, regionInfoB)) {
1743       throw new ServiceException(new MergeRegionException(
1744         "Unable to merge not adjacent regions "
1745           + regionInfoA.getRegionNameAsString() + ", "
1746           + regionInfoB.getRegionNameAsString()
1747           + " where forcible = " + forcible));
1748     }
1749 
1750     try {
1751       dispatchMergingRegions(regionInfoA, regionInfoB, forcible);
1752     } catch (IOException ioe) {
1753       throw new ServiceException(ioe);
1754     }
1755 
1756     return DispatchMergingRegionsResponse.newBuilder().build();
1757   }
1758 
1759   @Override
1760   public void dispatchMergingRegions(final HRegionInfo region_a,
1761       final HRegionInfo region_b, final boolean forcible) throws IOException {
1762     checkInitialized();
1763     this.executorService.submit(new DispatchMergingRegionHandler(this,
1764         this.catalogJanitorChore, region_a, region_b, forcible));
1765   }
1766 
1767   @Override
1768   public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req)
1769   throws ServiceException {
1770     final byte [] encodedRegionName = req.getRegion().getValue().toByteArray();
1771     RegionSpecifierType type = req.getRegion().getType();
1772     final byte [] destServerName = (req.hasDestServerName())?
1773       Bytes.toBytes(ProtobufUtil.toServerName(req.getDestServerName()).getServerName()):null;
1774     MoveRegionResponse mrr = MoveRegionResponse.newBuilder().build();
1775 
1776     if (type != RegionSpecifierType.ENCODED_REGION_NAME) {
1777       LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME
1778         + " actual: " + type);
1779     }
1780 
1781     try {
1782       move(encodedRegionName, destServerName);
1783     } catch (HBaseIOException ioe) {
1784       throw new ServiceException(ioe);
1785     }
1786     return mrr;
1787   }
1788 
1789   void move(final byte[] encodedRegionName,
1790       final byte[] destServerName) throws HBaseIOException {
1791     RegionState regionState = assignmentManager.getRegionStates().
1792       getRegionState(Bytes.toString(encodedRegionName));
1793     if (regionState == null) {
1794       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1795     }
1796 
1797     HRegionInfo hri = regionState.getRegion();
1798     ServerName dest;
1799     if (destServerName == null || destServerName.length == 0) {
1800       LOG.info("Passed destination servername is null/empty so " +
1801         "choosing a server at random");
1802       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1803         regionState.getServerName());
1804       dest = balancer.randomAssignment(hri, destServers);
1805     } else {
1806       dest = ServerName.valueOf(Bytes.toString(destServerName));
1807       if (dest.equals(regionState.getServerName())) {
1808         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1809           + " because region already assigned to the same server " + dest + ".");
1810         return;
1811       }
1812     }
1813 
1814     // Now we can do the move
1815     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1816 
1817     try {
1818       checkInitialized();
1819       if (this.cpHost != null) {
1820         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1821           return;
1822         }
1823       }
1824       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1825       this.assignmentManager.balance(rp);
1826       if (this.cpHost != null) {
1827         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1828       }
1829     } catch (IOException ioe) {
1830       if (ioe instanceof HBaseIOException) {
1831         throw (HBaseIOException)ioe;
1832       }
1833       throw new HBaseIOException(ioe);
1834     }
1835   }
1836 
1837   @Override
1838   public void createTable(HTableDescriptor hTableDescriptor,
1839     byte [][] splitKeys)
1840   throws IOException {
1841     if (!isMasterRunning()) {
1842       throw new MasterNotRunningException();
1843     }
1844 
1845     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1846     getNamespaceDescriptor(namespace); // ensure namespace exists
1847 
1848     HRegionInfo[] newRegions = getHRegionInfos(hTableDescriptor, splitKeys);
1849     checkInitialized();
1850     sanityCheckTableDescriptor(hTableDescriptor);
1851     if (cpHost != null) {
1852       cpHost.preCreateTable(hTableDescriptor, newRegions);
1853     }
1854     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1855     this.executorService.submit(new CreateTableHandler(this,
1856       this.fileSystemManager, hTableDescriptor, conf,
1857       newRegions, this).prepare());
1858     if (cpHost != null) {
1859       cpHost.postCreateTable(hTableDescriptor, newRegions);
1860     }
1861 
1862   }
1863 
1864   /**
1865    * Checks whether the table conforms to some sane limits, and configured
1866    * values (compression, etc) work. Throws an exception if something is wrong.
1867    * @throws IOException
1868    */
1869   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1870     final String CONF_KEY = "hbase.table.sanity.checks";
1871     if (!conf.getBoolean(CONF_KEY, true)) {
1872       return;
1873     }
1874     String tableVal = htd.getConfigurationValue(CONF_KEY);
1875     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1876       return;
1877     }
1878 
1879     // check max file size
1880     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1881     long maxFileSize = htd.getMaxFileSize();
1882     if (maxFileSize < 0) {
1883       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1884     }
1885     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1886       throw new DoNotRetryIOException("MAX_FILESIZE for table descriptor or "
1887         + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1888         + ") is too small, which might cause over splitting into unmanageable "
1889         + "number of regions. Set " + CONF_KEY + " to false at conf or table descriptor "
1890           + "if you want to bypass sanity checks");
1891     }
1892 
1893     // check flush size
1894     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1895     long flushSize = htd.getMemStoreFlushSize();
1896     if (flushSize < 0) {
1897       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1898     }
1899     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1900       throw new DoNotRetryIOException("MEMSTORE_FLUSHSIZE for table descriptor or "
1901           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1902           + " very frequent flushing. Set " + CONF_KEY + " to false at conf or table descriptor "
1903           + "if you want to bypass sanity checks");
1904     }
1905 
1906     // check that coprocessors and other specified plugin classes can be loaded
1907     try {
1908       checkClassLoading(conf, htd);
1909     } catch (Exception ex) {
1910       throw new DoNotRetryIOException(ex);
1911     }
1912 
1913     // check compression can be loaded
1914     try {
1915       checkCompression(htd);
1916     } catch (IOException e) {
1917       throw new DoNotRetryIOException(e.getMessage(), e);
1918     }
1919 
1920     // check encryption can be loaded
1921     try {
1922       checkEncryption(conf, htd);
1923     } catch (IOException e) {
1924       throw new DoNotRetryIOException(e.getMessage(), e);
1925     }
1926 
1927     // check that we have at least 1 CF
1928     if (htd.getColumnFamilies().length == 0) {
1929       throw new DoNotRetryIOException("Table should have at least one column family "
1930           + "Set "+CONF_KEY+" at conf or table descriptor if you want to bypass sanity checks");
1931     }
1932 
1933     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1934       if (hcd.getTimeToLive() <= 0) {
1935         throw new DoNotRetryIOException("TTL for column family " + hcd.getNameAsString()
1936           + "  must be positive. Set " + CONF_KEY + " to false at conf or table descriptor "
1937           + "if you want to bypass sanity checks");
1938       }
1939 
1940       // check blockSize
1941       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1942         throw new DoNotRetryIOException("Block size for column family " + hcd.getNameAsString()
1943           + "  must be between 1K and 16MB Set "+CONF_KEY+" to false at conf or table descriptor "
1944           + "if you want to bypass sanity checks");
1945       }
1946 
1947       // check versions
1948       if (hcd.getMinVersions() < 0) {
1949         throw new DoNotRetryIOException("Min versions for column family " + hcd.getNameAsString()
1950           + "  must be positive. Set " + CONF_KEY + " to false at conf or table descriptor "
1951           + "if you want to bypass sanity checks");
1952       }
1953       // max versions already being checked
1954 
1955       // check replication scope
1956       if (hcd.getScope() < 0) {
1957         throw new DoNotRetryIOException("Replication scope for column family "
1958           + hcd.getNameAsString() + "  must be positive. Set " + CONF_KEY + " to false at conf "
1959           + "or table descriptor if you want to bypass sanity checks");
1960       }
1961 
1962       // TODO: should we check coprocessors and encryption ?
1963     }
1964   }
1965 
1966   private void checkCompression(final HTableDescriptor htd)
1967   throws IOException {
1968     if (!this.masterCheckCompression) return;
1969     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1970       checkCompression(hcd);
1971     }
1972   }
1973 
1974   private void checkCompression(final HColumnDescriptor hcd)
1975   throws IOException {
1976     if (!this.masterCheckCompression) return;
1977     CompressionTest.testCompression(hcd.getCompression());
1978     CompressionTest.testCompression(hcd.getCompactionCompression());
1979   }
1980 
1981   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1982   throws IOException {
1983     if (!this.masterCheckEncryption) return;
1984     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1985       checkEncryption(conf, hcd);
1986     }
1987   }
1988 
1989   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1990   throws IOException {
1991     if (!this.masterCheckEncryption) return;
1992     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1993   }
1994 
1995   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1996   throws IOException {
1997     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1998     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1999   }
2000 
2001   @Override
2002   public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
2003   throws ServiceException {
2004     HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
2005     byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
2006     try {
2007       createTable(hTableDescriptor,splitKeys);
2008     } catch (IOException ioe) {
2009       throw new ServiceException(ioe);
2010     }
2011     return CreateTableResponse.newBuilder().build();
2012   }
2013 
2014   private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor,
2015     byte[][] splitKeys) {
2016     HRegionInfo[] hRegionInfos = null;
2017     if (splitKeys == null || splitKeys.length == 0) {
2018       hRegionInfos = new HRegionInfo[]{
2019           new HRegionInfo(hTableDescriptor.getTableName(), null, null)};
2020     } else {
2021       int numRegions = splitKeys.length + 1;
2022       hRegionInfos = new HRegionInfo[numRegions];
2023       byte[] startKey = null;
2024       byte[] endKey = null;
2025       for (int i = 0; i < numRegions; i++) {
2026         endKey = (i == splitKeys.length) ? null : splitKeys[i];
2027         hRegionInfos[i] =
2028             new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey);
2029         startKey = endKey;
2030       }
2031     }
2032     return hRegionInfos;
2033   }
2034 
2035   private static boolean isCatalogTable(final TableName tableName) {
2036     return tableName.equals(TableName.META_TABLE_NAME);
2037   }
2038 
2039   @Override
2040   public void deleteTable(final TableName tableName) throws IOException {
2041     checkInitialized();
2042     if (cpHost != null) {
2043       cpHost.preDeleteTable(tableName);
2044     }
2045     LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
2046     this.executorService.submit(new DeleteTableHandler(tableName, this, this).prepare());
2047     if (cpHost != null) {
2048       cpHost.postDeleteTable(tableName);
2049     }
2050   }
2051 
2052   @Override
2053   public DeleteTableResponse deleteTable(RpcController controller, DeleteTableRequest request)
2054   throws ServiceException {
2055     try {
2056       deleteTable(ProtobufUtil.toTableName(request.getTableName()));
2057     } catch (IOException ioe) {
2058       throw new ServiceException(ioe);
2059     }
2060     return DeleteTableResponse.newBuilder().build();
2061   }
2062 
2063   /**
2064    * Get the number of regions of the table that have been updated by the alter.
2065    *
2066    * @return Pair indicating the number of regions updated Pair.getFirst is the
2067    *         regions that are yet to be updated Pair.getSecond is the total number
2068    *         of regions of the table
2069    * @throws IOException
2070    */
2071   @Override
2072   public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2073       RpcController controller, GetSchemaAlterStatusRequest req) throws ServiceException {
2074     // TODO: currently, we query using the table name on the client side. this
2075     // may overlap with other table operations or the table operation may
2076     // have completed before querying this API. We need to refactor to a
2077     // transaction system in the future to avoid these ambiguities.
2078     TableName tableName = ProtobufUtil.toTableName(req.getTableName());
2079 
2080     try {
2081       Pair<Integer,Integer> pair = this.assignmentManager.getReopenStatus(tableName);
2082       GetSchemaAlterStatusResponse.Builder ret = GetSchemaAlterStatusResponse.newBuilder();
2083       ret.setYetToUpdateRegions(pair.getFirst());
2084       ret.setTotalRegions(pair.getSecond());
2085       return ret.build();
2086     } catch (IOException ioe) {
2087       throw new ServiceException(ioe);
2088     }
2089   }
2090 
2091   @Override
2092   public void addColumn(final TableName tableName, final HColumnDescriptor columnDescriptor)
2093       throws IOException {
2094     checkInitialized();
2095     checkCompression(columnDescriptor);
2096     checkEncryption(conf, columnDescriptor);
2097     if (cpHost != null) {
2098       if (cpHost.preAddColumn(tableName, columnDescriptor)) {
2099         return;
2100       }
2101     }
2102     //TODO: we should process this (and some others) in an executor
2103     new TableAddFamilyHandler(tableName, columnDescriptor, this, this).prepare().process();
2104     if (cpHost != null) {
2105       cpHost.postAddColumn(tableName, columnDescriptor);
2106     }
2107   }
2108 
2109   @Override
2110   public AddColumnResponse addColumn(RpcController controller, AddColumnRequest req)
2111   throws ServiceException {
2112     try {
2113       addColumn(ProtobufUtil.toTableName(req.getTableName()),
2114         HColumnDescriptor.convert(req.getColumnFamilies()));
2115     } catch (IOException ioe) {
2116       throw new ServiceException(ioe);
2117     }
2118     return AddColumnResponse.newBuilder().build();
2119   }
2120 
2121   @Override
2122   public void modifyColumn(TableName tableName, HColumnDescriptor descriptor)
2123       throws IOException {
2124     checkInitialized();
2125     checkCompression(descriptor);
2126     checkEncryption(conf, descriptor);
2127     if (cpHost != null) {
2128       if (cpHost.preModifyColumn(tableName, descriptor)) {
2129         return;
2130       }
2131     }
2132     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
2133     new TableModifyFamilyHandler(tableName, descriptor, this, this)
2134       .prepare().process();
2135     if (cpHost != null) {
2136       cpHost.postModifyColumn(tableName, descriptor);
2137     }
2138   }
2139 
2140   @Override
2141   public ModifyColumnResponse modifyColumn(RpcController controller, ModifyColumnRequest req)
2142   throws ServiceException {
2143     try {
2144       modifyColumn(ProtobufUtil.toTableName(req.getTableName()),
2145         HColumnDescriptor.convert(req.getColumnFamilies()));
2146     } catch (IOException ioe) {
2147       throw new ServiceException(ioe);
2148     }
2149     return ModifyColumnResponse.newBuilder().build();
2150   }
2151 
2152   @Override
2153   public void deleteColumn(final TableName tableName, final byte[] columnName)
2154       throws IOException {
2155     checkInitialized();
2156     if (cpHost != null) {
2157       if (cpHost.preDeleteColumn(tableName, columnName)) {
2158         return;
2159       }
2160     }
2161     LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
2162     new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process();
2163     if (cpHost != null) {
2164       cpHost.postDeleteColumn(tableName, columnName);
2165     }
2166   }
2167 
2168   @Override
2169   public DeleteColumnResponse deleteColumn(RpcController controller, DeleteColumnRequest req)
2170   throws ServiceException {
2171     try {
2172       deleteColumn(ProtobufUtil.toTableName(req.getTableName()),
2173           req.getColumnName().toByteArray());
2174     } catch (IOException ioe) {
2175       throw new ServiceException(ioe);
2176     }
2177     return DeleteColumnResponse.newBuilder().build();
2178   }
2179 
2180   @Override
2181   public void enableTable(final TableName tableName) throws IOException {
2182     checkInitialized();
2183     if (cpHost != null) {
2184       cpHost.preEnableTable(tableName);
2185     }
2186     LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2187     this.executorService.submit(new EnableTableHandler(this, tableName,
2188       catalogTracker, assignmentManager, tableLockManager, false).prepare());
2189     if (cpHost != null) {
2190       cpHost.postEnableTable(tableName);
2191    }
2192   }
2193 
2194   @Override
2195   public EnableTableResponse enableTable(RpcController controller, EnableTableRequest request)
2196   throws ServiceException {
2197     try {
2198       enableTable(ProtobufUtil.toTableName(request.getTableName()));
2199     } catch (IOException ioe) {
2200       throw new ServiceException(ioe);
2201     }
2202     return EnableTableResponse.newBuilder().build();
2203   }
2204 
2205   @Override
2206   public void disableTable(final TableName tableName) throws IOException {
2207     checkInitialized();
2208     if (cpHost != null) {
2209       cpHost.preDisableTable(tableName);
2210     }
2211     LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2212     this.executorService.submit(new DisableTableHandler(this, tableName,
2213       catalogTracker, assignmentManager, tableLockManager, false).prepare());
2214     if (cpHost != null) {
2215       cpHost.postDisableTable(tableName);
2216     }
2217   }
2218 
2219   @Override
2220   public DisableTableResponse disableTable(RpcController controller, DisableTableRequest request)
2221   throws ServiceException {
2222     try {
2223       disableTable(ProtobufUtil.toTableName(request.getTableName()));
2224     } catch (IOException ioe) {
2225       throw new ServiceException(ioe);
2226     }
2227     return DisableTableResponse.newBuilder().build();
2228   }
2229 
2230   /**
2231    * Return the region and current deployment for the region containing
2232    * the given row. If the region cannot be found, returns null. If it
2233    * is found, but not currently deployed, the second element of the pair
2234    * may be null.
2235    */
2236   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2237       final TableName tableName, final byte [] rowKey)
2238   throws IOException {
2239     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2240       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2241 
2242     MetaScannerVisitor visitor =
2243       new MetaScannerVisitorBase() {
2244         @Override
2245         public boolean processRow(Result data) throws IOException {
2246           if (data == null || data.size() <= 0) {
2247             return true;
2248           }
2249           Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
2250           if (pair == null) {
2251             return false;
2252           }
2253           if (!pair.getFirst().getTable().equals(tableName)) {
2254             return false;
2255           }
2256           result.set(pair);
2257           return true;
2258         }
2259     };
2260 
2261     MetaScanner.metaScan(conf, visitor, tableName, rowKey, 1);
2262     return result.get();
2263   }
2264 
2265   @Override
2266   public void modifyTable(final TableName tableName, final HTableDescriptor descriptor)
2267       throws IOException {
2268     checkInitialized();
2269     sanityCheckTableDescriptor(descriptor);
2270     if (cpHost != null) {
2271       cpHost.preModifyTable(tableName, descriptor);
2272     }
2273     LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2274     new ModifyTableHandler(tableName, descriptor, this, this).prepare().process();
2275     if (cpHost != null) {
2276       cpHost.postModifyTable(tableName, descriptor);
2277     }
2278   }
2279 
2280   @Override
2281   public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequest req)
2282   throws ServiceException {
2283     try {
2284       modifyTable(ProtobufUtil.toTableName(req.getTableName()),
2285         HTableDescriptor.convert(req.getTableSchema()));
2286     } catch (IOException ioe) {
2287       throw new ServiceException(ioe);
2288     }
2289     return ModifyTableResponse.newBuilder().build();
2290   }
2291 
2292   @Override
2293   public void checkTableModifiable(final TableName tableName)
2294       throws IOException, TableNotFoundException, TableNotDisabledException {
2295     if (isCatalogTable(tableName)) {
2296       throw new IOException("Can't modify catalog tables");
2297     }
2298     if (!MetaReader.tableExists(getCatalogTracker(), tableName)) {
2299       throw new TableNotFoundException(tableName);
2300     }
2301     if (!getAssignmentManager().getZKTable().
2302         isDisabledTable(tableName)) {
2303       throw new TableNotDisabledException(tableName);
2304     }
2305   }
2306 
2307   @Override
2308   public GetClusterStatusResponse getClusterStatus(RpcController controller,
2309       GetClusterStatusRequest req)
2310   throws ServiceException {
2311     GetClusterStatusResponse.Builder response = GetClusterStatusResponse.newBuilder();
2312     response.setClusterStatus(getClusterStatus().convert());
2313     return response.build();
2314   }
2315 
2316   /**
2317    * @return cluster status
2318    */
2319   public ClusterStatus getClusterStatus() {
2320     // Build Set of backup masters from ZK nodes
2321     List<String> backupMasterStrings;
2322     try {
2323       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2324         this.zooKeeper.backupMasterAddressesZNode);
2325     } catch (KeeperException e) {
2326       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2327       backupMasterStrings = new ArrayList<String>(0);
2328     }
2329     List<ServerName> backupMasters = new ArrayList<ServerName>(
2330                                           backupMasterStrings.size());
2331     for (String s: backupMasterStrings) {
2332       try {
2333         byte [] bytes =
2334             ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2335                 this.zooKeeper.backupMasterAddressesZNode, s));
2336         if (bytes != null) {
2337           ServerName sn;
2338           try {
2339             sn = ServerName.parseFrom(bytes);
2340           } catch (DeserializationException e) {
2341             LOG.warn("Failed parse, skipping registering backup server", e);
2342             continue;
2343           }
2344           backupMasters.add(sn);
2345         }
2346       } catch (KeeperException e) {
2347         LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2348                  "backup servers"), e);
2349       }
2350     }
2351     Collections.sort(backupMasters, new Comparator<ServerName>() {
2352       @Override
2353       public int compare(ServerName s1, ServerName s2) {
2354         return s1.getServerName().compareTo(s2.getServerName());
2355       }});
2356 
2357     return new ClusterStatus(VersionInfo.getVersion(),
2358       this.fileSystemManager.getClusterId().toString(),
2359       this.serverManager.getOnlineServers(),
2360       this.serverManager.getDeadServers().copyServerNames(),
2361       this.serverName,
2362       backupMasters,
2363       this.assignmentManager.getRegionStates().getRegionsInTransition(),
2364       this.getCoprocessors(), this.loadBalancerTracker.isBalancerOn());
2365   }
2366 
2367   public String getClusterId() {
2368     if (fileSystemManager == null) {
2369       return "";
2370     }
2371     ClusterId id = fileSystemManager.getClusterId();
2372     if (id == null) {
2373       return "";
2374     }
2375     return id.toString();
2376   }
2377 
2378   /**
2379    * The set of loaded coprocessors is stored in a static set. Since it's
2380    * statically allocated, it does not require that HMaster's cpHost be
2381    * initialized prior to accessing it.
2382    * @return a String representation of the set of names of the loaded
2383    * coprocessors.
2384    */
2385   public static String getLoadedCoprocessors() {
2386     return CoprocessorHost.getLoadedCoprocessors().toString();
2387   }
2388 
2389   /**
2390    * @return timestamp in millis when HMaster was started.
2391    */
2392   public long getMasterStartTime() {
2393     return masterStartTime;
2394   }
2395 
2396   /**
2397    * @return timestamp in millis when HMaster became the active master.
2398    */
2399   public long getMasterActiveTime() {
2400     return masterActiveTime;
2401   }
2402 
2403   public int getRegionServerInfoPort(final ServerName sn) {
2404     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2405     if (info == null || info.getInfoPort() == 0) {
2406       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2407         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2408     }
2409     return info.getInfoPort();
2410   }
2411 
2412   /**
2413    * @return array of coprocessor SimpleNames.
2414    */
2415   public String[] getCoprocessors() {
2416     Set<String> masterCoprocessors =
2417         getCoprocessorHost().getCoprocessors();
2418     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2419   }
2420 
2421   @Override
2422   public void abort(final String msg, final Throwable t) {
2423     if (cpHost != null) {
2424       // HBASE-4014: dump a list of loaded coprocessors.
2425       LOG.fatal("Master server abort: loaded coprocessors are: " +
2426           getLoadedCoprocessors());
2427     }
2428 
2429     if (abortNow(msg, t)) {
2430       if (t != null) LOG.fatal(msg, t);
2431       else LOG.fatal(msg);
2432       this.abort = true;
2433       stop("Aborting");
2434     }
2435   }
2436 
2437   /**
2438    * We do the following in a different thread.  If it is not completed
2439    * in time, we will time it out and assume it is not easy to recover.
2440    *
2441    * 1. Create a new ZK session. (since our current one is expired)
2442    * 2. Try to become a primary master again
2443    * 3. Initialize all ZK based system trackers.
2444    * 4. Assign meta. (they are already assigned, but we need to update our
2445    * internal memory state to reflect it)
2446    * 5. Process any RIT if any during the process of our recovery.
2447    *
2448    * @return True if we could successfully recover from ZK session expiry.
2449    * @throws InterruptedException
2450    * @throws IOException
2451    * @throws KeeperException
2452    * @throws ExecutionException
2453    */
2454   private boolean tryRecoveringExpiredZKSession() throws InterruptedException,
2455       IOException, KeeperException, ExecutionException {
2456 
2457     this.zooKeeper.unregisterAllListeners();
2458     // add back listeners which were registered before master initialization
2459     // because they won't be added back in below Master re-initialization code
2460     if (this.registeredZKListenersBeforeRecovery != null) {
2461       for (ZooKeeperListener curListener : this.registeredZKListenersBeforeRecovery) {
2462         this.zooKeeper.registerListener(curListener);
2463       }
2464     }
2465 
2466     this.zooKeeper.reconnectAfterExpiration();
2467 
2468     Callable<Boolean> callable = new Callable<Boolean> () {
2469       @Override
2470       public Boolean call() throws InterruptedException,
2471           IOException, KeeperException {
2472         MonitoredTask status =
2473           TaskMonitor.get().createStatus("Recovering expired ZK session");
2474         try {
2475           if (!becomeActiveMaster(status)) {
2476             return Boolean.FALSE;
2477           }
2478           serverShutdownHandlerEnabled = false;
2479           initialized = false;
2480           finishInitialization(status, true);
2481           return !stopped;
2482         } finally {
2483           status.cleanup();
2484         }
2485       }
2486     };
2487 
2488     long timeout =
2489       conf.getLong("hbase.master.zksession.recover.timeout", 300000);
2490     java.util.concurrent.ExecutorService executor =
2491       Executors.newSingleThreadExecutor();
2492     Future<Boolean> result = executor.submit(callable);
2493     executor.shutdown();
2494     if (executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)
2495         && result.isDone()) {
2496       Boolean recovered = result.get();
2497       if (recovered != null) {
2498         return recovered.booleanValue();
2499       }
2500     }
2501     executor.shutdownNow();
2502     return false;
2503   }
2504 
2505   /**
2506    * Check to see if the current trigger for abort is due to ZooKeeper session
2507    * expiry, and If yes, whether we can recover from ZK session expiry.
2508    *
2509    * @param msg Original abort message
2510    * @param t   The cause for current abort request
2511    * @return true if we should proceed with abort operation, false other wise.
2512    */
2513   private boolean abortNow(final String msg, final Throwable t) {
2514     if (!this.isActiveMaster || this.stopped) {
2515       return true;
2516     }
2517 
2518     boolean failFast = conf.getBoolean("fail.fast.expired.active.master", false);
2519     if (t != null && t instanceof KeeperException.SessionExpiredException
2520         && !failFast) {
2521       try {
2522         LOG.info("Primary Master trying to recover from ZooKeeper session " +
2523             "expiry.");
2524         return !tryRecoveringExpiredZKSession();
2525       } catch (Throwable newT) {
2526         LOG.error("Primary master encountered unexpected exception while " +
2527             "trying to recover from ZooKeeper session" +
2528             " expiry. Proceeding with server abort.", newT);
2529       }
2530     }
2531     return true;
2532   }
2533 
2534   @Override
2535   public ZooKeeperWatcher getZooKeeper() {
2536     return zooKeeper;
2537   }
2538 
2539   @Override
2540   public MasterCoprocessorHost getCoprocessorHost() {
2541     return cpHost;
2542   }
2543 
2544   @Override
2545   public ServerName getServerName() {
2546     return this.serverName;
2547   }
2548 
2549   @Override
2550   public CatalogTracker getCatalogTracker() {
2551     return catalogTracker;
2552   }
2553 
2554   @Override
2555   public AssignmentManager getAssignmentManager() {
2556     return this.assignmentManager;
2557   }
2558 
2559   @Override
2560   public TableLockManager getTableLockManager() {
2561     return this.tableLockManager;
2562   }
2563 
2564   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2565     return rsFatals;
2566   }
2567 
2568   public void shutdown() {
2569     if (spanReceiverHost != null) {
2570       spanReceiverHost.closeReceivers();
2571     }
2572     if (cpHost != null) {
2573       try {
2574         cpHost.preShutdown();
2575       } catch (IOException ioe) {
2576         LOG.error("Error call master coprocessor preShutdown()", ioe);
2577       }
2578     }
2579     if (mxBean != null) {
2580       MBeanUtil.unregisterMBean(mxBean);
2581       mxBean = null;
2582     }
2583     if (this.assignmentManager != null) this.assignmentManager.shutdown();
2584     if (this.serverManager != null) this.serverManager.shutdownCluster();
2585     try {
2586       if (this.clusterStatusTracker != null){
2587         this.clusterStatusTracker.setClusterDown();
2588       }
2589     } catch (KeeperException e) {
2590       LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2591     }
2592   }
2593 
2594   @Override
2595   public ShutdownResponse shutdown(RpcController controller, ShutdownRequest request)
2596   throws ServiceException {
2597     LOG.info(getClientIdAuditPrefix() + " shutdown");
2598     shutdown();
2599     return ShutdownResponse.newBuilder().build();
2600   }
2601 
2602   public void stopMaster() {
2603     if (cpHost != null) {
2604       try {
2605         cpHost.preStopMaster();
2606       } catch (IOException ioe) {
2607         LOG.error("Error call master coprocessor preStopMaster()", ioe);
2608       }
2609     }
2610     stop("Stopped by " + Thread.currentThread().getName());
2611   }
2612 
2613   @Override
2614   public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request)
2615   throws ServiceException {
2616     LOG.info(getClientIdAuditPrefix() + " stop");
2617     stopMaster();
2618     return StopMasterResponse.newBuilder().build();
2619   }
2620 
2621   @Override
2622   public void stop(final String why) {
2623     LOG.info(why);
2624     this.stopped = true;
2625     // We wake up the stopSleeper to stop immediately
2626     stopSleeper.skipSleepCycle();
2627     // If we are a backup master, we need to interrupt wait
2628     if (this.activeMasterManager != null) {
2629       synchronized (this.activeMasterManager.clusterHasActiveMaster) {
2630         this.activeMasterManager.clusterHasActiveMaster.notifyAll();
2631       }
2632     }
2633     // If no region server is online then master may stuck waiting on hbase:meta to come on line.
2634     // See HBASE-8422.
2635     if (this.catalogTracker != null && this.serverManager.getOnlineServers().isEmpty()) {
2636       this.catalogTracker.stop();
2637     }
2638   }
2639 
2640   @Override
2641   public boolean isStopped() {
2642     return this.stopped;
2643   }
2644 
2645   @Override
2646   public boolean isAborted() {
2647     return this.abort;
2648   }
2649 
2650   void checkInitialized() throws PleaseHoldException {
2651     if (!this.initialized) {
2652       throw new PleaseHoldException("Master is initializing");
2653     }
2654   }
2655 
2656   /**
2657    * Report whether this master is currently the active master or not.
2658    * If not active master, we are parked on ZK waiting to become active.
2659    *
2660    * This method is used for testing.
2661    *
2662    * @return true if active master, false if not.
2663    */
2664   public boolean isActiveMaster() {
2665     return isActiveMaster;
2666   }
2667 
2668   /**
2669    * Report whether this master has completed with its initialization and is
2670    * ready.  If ready, the master is also the active master.  A standby master
2671    * is never ready.
2672    *
2673    * This method is used for testing.
2674    *
2675    * @return true if master is ready to go, false if not.
2676    */
2677   @Override
2678   public boolean isInitialized() {
2679     return initialized;
2680   }
2681 
2682   /**
2683    * ServerShutdownHandlerEnabled is set false before completing
2684    * assignMeta to prevent processing of ServerShutdownHandler.
2685    * @return true if assignMeta has completed;
2686    */
2687   @Override
2688   public boolean isServerShutdownHandlerEnabled() {
2689     return this.serverShutdownHandlerEnabled;
2690   }
2691 
2692   /**
2693    * Report whether this master has started initialization and is about to do meta region assignment
2694    * @return true if master is in initialization & about to assign hbase:meta regions
2695    */
2696   public boolean isInitializationStartsMetaRegionAssignment() {
2697     return this.initializationBeforeMetaAssignment;
2698   }
2699 
2700   @Override
2701   public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req)
2702   throws ServiceException {
2703     try {
2704       final byte [] regionName = req.getRegion().getValue().toByteArray();
2705       RegionSpecifierType type = req.getRegion().getType();
2706       AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
2707 
2708       checkInitialized();
2709       if (type != RegionSpecifierType.REGION_NAME) {
2710         LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2711           + " actual: " + type);
2712       }
2713       HRegionInfo regionInfo = assignmentManager.getRegionStates().getRegionInfo(regionName);
2714       if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName));
2715       if (cpHost != null) {
2716         if (cpHost.preAssign(regionInfo)) {
2717           return arr;
2718         }
2719       }
2720       LOG.info(getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString());
2721       assignmentManager.assign(regionInfo, true, true);
2722       if (cpHost != null) {
2723         cpHost.postAssign(regionInfo);
2724       }
2725 
2726       return arr;
2727     } catch (IOException ioe) {
2728       throw new ServiceException(ioe);
2729     }
2730   }
2731 
2732   public void assignRegion(HRegionInfo hri) {
2733     assignmentManager.assign(hri, true);
2734   }
2735 
2736   @Override
2737   public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
2738   throws ServiceException {
2739     try {
2740       final byte [] regionName = req.getRegion().getValue().toByteArray();
2741       RegionSpecifierType type = req.getRegion().getType();
2742       final boolean force = req.getForce();
2743       UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
2744 
2745       checkInitialized();
2746       if (type != RegionSpecifierType.REGION_NAME) {
2747         LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2748           + " actual: " + type);
2749       }
2750       Pair<HRegionInfo, ServerName> pair =
2751         MetaReader.getRegion(this.catalogTracker, regionName);
2752       if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
2753       HRegionInfo hri = pair.getFirst();
2754       if (cpHost != null) {
2755         if (cpHost.preUnassign(hri, force)) {
2756           return urr;
2757         }
2758       }
2759       LOG.debug(getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
2760           + " in current location if it is online and reassign.force=" + force);
2761       this.assignmentManager.unassign(hri, force);
2762       if (this.assignmentManager.getRegionStates().isRegionOffline(hri)) {
2763         LOG.debug("Region " + hri.getRegionNameAsString()
2764             + " is not online on any region server, reassigning it.");
2765         assignRegion(hri);
2766       }
2767       if (cpHost != null) {
2768         cpHost.postUnassign(hri, force);
2769       }
2770 
2771       return urr;
2772     } catch (IOException ioe) {
2773       throw new ServiceException(ioe);
2774     }
2775   }
2776 
2777   /**
2778    * Get list of TableDescriptors for requested tables.
2779    * @param controller Unused (set to null).
2780    * @param req GetTableDescriptorsRequest that contains:
2781    * - tableNames: requested tables, or if empty, all are requested
2782    * @return GetTableDescriptorsResponse
2783    * @throws ServiceException
2784    */
2785   @Override
2786   public GetTableDescriptorsResponse getTableDescriptors(
2787 	      RpcController controller, GetTableDescriptorsRequest req) throws ServiceException {
2788     List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2789     List<TableName> tableNameList = new ArrayList<TableName>();
2790     for(HBaseProtos.TableName tableNamePB: req.getTableNamesList()) {
2791       tableNameList.add(ProtobufUtil.toTableName(tableNamePB));
2792     }
2793     boolean bypass = false;
2794     if (this.cpHost != null) {
2795       try {
2796         bypass = this.cpHost.preGetTableDescriptors(tableNameList, descriptors);
2797       } catch (IOException ioe) {
2798         throw new ServiceException(ioe);
2799       }
2800     }
2801 
2802     if (!bypass) {
2803       if (req.getTableNamesCount() == 0) {
2804         // request for all TableDescriptors
2805         Map<String, HTableDescriptor> descriptorMap = null;
2806         try {
2807           descriptorMap = this.tableDescriptors.getAll();
2808         } catch (IOException e) {
2809           LOG.warn("Failed getting all descriptors", e);
2810         }
2811         if (descriptorMap != null) {
2812           for(HTableDescriptor desc: descriptorMap.values()) {
2813             if(!desc.getTableName().isSystemTable()) {
2814               descriptors.add(desc);
2815             }
2816           }
2817         }
2818       } else {
2819         for (TableName s: tableNameList) {
2820           try {
2821             HTableDescriptor desc = this.tableDescriptors.get(s);
2822             if (desc != null) {
2823               descriptors.add(desc);
2824             }
2825           } catch (IOException e) {
2826             LOG.warn("Failed getting descriptor for " + s, e);
2827           }
2828         }
2829       }
2830 
2831       if (this.cpHost != null) {
2832         try {
2833           this.cpHost.postGetTableDescriptors(descriptors);
2834         } catch (IOException ioe) {
2835           throw new ServiceException(ioe);
2836         }
2837       }
2838     }
2839 
2840     GetTableDescriptorsResponse.Builder builder = GetTableDescriptorsResponse.newBuilder();
2841     for (HTableDescriptor htd: descriptors) {
2842       builder.addTableSchema(htd.convert());
2843     }
2844     return builder.build();
2845   }
2846 
2847   /**
2848    * Get list of userspace table names
2849    * @param controller Unused (set to null).
2850    * @param req GetTableNamesRequest
2851    * @return GetTableNamesResponse
2852    * @throws ServiceException
2853    */
2854   @Override
2855   public GetTableNamesResponse getTableNames(
2856         RpcController controller, GetTableNamesRequest req) throws ServiceException {
2857     try {
2858       Collection<HTableDescriptor> descriptors = this.tableDescriptors.getAll().values();
2859       GetTableNamesResponse.Builder builder = GetTableNamesResponse.newBuilder();
2860       for (HTableDescriptor descriptor: descriptors) {
2861         if (descriptor.getTableName().isSystemTable()) {
2862           continue;
2863         }
2864         builder.addTableNames(ProtobufUtil.toProtoTableName(descriptor.getTableName()));
2865       }
2866       return builder.build();
2867     } catch (IOException e) {
2868       throw new ServiceException(e);
2869     }
2870   }
2871 
2872   /**
2873    * Compute the average load across all region servers.
2874    * Currently, this uses a very naive computation - just uses the number of
2875    * regions being served, ignoring stats about number of requests.
2876    * @return the average load
2877    */
2878   public double getAverageLoad() {
2879     if (this.assignmentManager == null) {
2880       return 0;
2881     }
2882 
2883     RegionStates regionStates = this.assignmentManager.getRegionStates();
2884     if (regionStates == null) {
2885       return 0;
2886     }
2887     return regionStates.getAverageLoad();
2888   }
2889 
2890   /**
2891    * Offline specified region from master's in-memory state. It will not attempt to
2892    * reassign the region as in unassign.
2893    *
2894    * This is a special method that should be used by experts or hbck.
2895    *
2896    */
2897   @Override
2898   public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request)
2899   throws ServiceException {
2900     final byte [] regionName = request.getRegion().getValue().toByteArray();
2901     RegionSpecifierType type = request.getRegion().getType();
2902     if (type != RegionSpecifierType.REGION_NAME) {
2903       LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2904         + " actual: " + type);
2905     }
2906 
2907     try {
2908       Pair<HRegionInfo, ServerName> pair =
2909         MetaReader.getRegion(this.catalogTracker, regionName);
2910       if (pair == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
2911       HRegionInfo hri = pair.getFirst();
2912       if (cpHost != null) {
2913         cpHost.preRegionOffline(hri);
2914       }
2915       LOG.info(getClientIdAuditPrefix() + " offline " + hri.getRegionNameAsString());
2916       this.assignmentManager.regionOffline(hri);
2917       if (cpHost != null) {
2918         cpHost.postRegionOffline(hri);
2919       }
2920     } catch (IOException ioe) {
2921       throw new ServiceException(ioe);
2922     }
2923     return OfflineRegionResponse.newBuilder().build();
2924   }
2925 
2926   @Override
2927   public boolean registerService(Service instance) {
2928     /*
2929      * No stacking of instances is allowed for a single service name
2930      */
2931     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2932     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2933       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2934           " already registered, rejecting request from "+instance
2935       );
2936       return false;
2937     }
2938 
2939     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2940     if (LOG.isDebugEnabled()) {
2941       LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2942     }
2943     return true;
2944   }
2945 
2946   @Override
2947   public ClientProtos.CoprocessorServiceResponse execMasterService(final RpcController controller,
2948       final ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
2949     try {
2950       ServerRpcController execController = new ServerRpcController();
2951 
2952       ClientProtos.CoprocessorServiceCall call = request.getCall();
2953       String serviceName = call.getServiceName();
2954       String methodName = call.getMethodName();
2955       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
2956         throw new UnknownProtocolException(null,
2957             "No registered master coprocessor service found for name "+serviceName);
2958       }
2959 
2960       Service service = coprocessorServiceHandlers.get(serviceName);
2961       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
2962       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
2963       if (methodDesc == null) {
2964         throw new UnknownProtocolException(service.getClass(),
2965             "Unknown method "+methodName+" called on master service "+serviceName);
2966       }
2967 
2968       //invoke the method
2969       Message execRequest = service.getRequestPrototype(methodDesc).newBuilderForType()
2970           .mergeFrom(call.getRequest()).build();
2971       final Message.Builder responseBuilder =
2972           service.getResponsePrototype(methodDesc).newBuilderForType();
2973       service.callMethod(methodDesc, execController, execRequest, new RpcCallback<Message>() {
2974         @Override
2975         public void run(Message message) {
2976           if (message != null) {
2977             responseBuilder.mergeFrom(message);
2978           }
2979         }
2980       });
2981       Message execResult = responseBuilder.build();
2982 
2983       if (execController.getFailedOn() != null) {
2984         throw execController.getFailedOn();
2985       }
2986       ClientProtos.CoprocessorServiceResponse.Builder builder =
2987           ClientProtos.CoprocessorServiceResponse.newBuilder();
2988       builder.setRegion(RequestConverter.buildRegionSpecifier(
2989           RegionSpecifierType.REGION_NAME, HConstants.EMPTY_BYTE_ARRAY));
2990       builder.setValue(
2991           builder.getValueBuilder().setName(execResult.getClass().getName())
2992               .setValue(execResult.toByteString()));
2993       return builder.build();
2994     } catch (IOException ie) {
2995       throw new ServiceException(ie);
2996     }
2997   }
2998 
2999   /**
3000    * Utility for constructing an instance of the passed HMaster class.
3001    * @param masterClass
3002    * @param conf
3003    * @return HMaster instance.
3004    */
3005   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
3006       final Configuration conf)  {
3007     try {
3008       Constructor<? extends HMaster> c =
3009         masterClass.getConstructor(Configuration.class);
3010       return c.newInstance(conf);
3011     } catch (InvocationTargetException ite) {
3012       Throwable target = ite.getTargetException() != null?
3013         ite.getTargetException(): ite;
3014       if (target.getCause() != null) target = target.getCause();
3015       throw new RuntimeException("Failed construction of Master: " +
3016         masterClass.toString(), target);
3017     } catch (Exception e) {
3018       throw new RuntimeException("Failed construction of Master: " +
3019         masterClass.toString() + ((e.getCause() != null)?
3020           e.getCause().getMessage(): ""), e);
3021     }
3022   }
3023 
3024   /**
3025    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
3026    */
3027   public static void main(String [] args) {
3028     VersionInfo.logVersion();
3029     new HMasterCommandLine(HMaster.class).doMain(args);
3030   }
3031 
3032   public HFileCleaner getHFileCleaner() {
3033     return this.hfileCleaner;
3034   }
3035 
3036   /**
3037    * Exposed for TESTING!
3038    * @return the underlying snapshot manager
3039    */
3040   public SnapshotManager getSnapshotManagerForTesting() {
3041     return this.snapshotManager;
3042   }
3043 
3044   /**
3045    * Triggers an asynchronous attempt to take a snapshot.
3046    * {@inheritDoc}
3047    */
3048   @Override
3049   public SnapshotResponse snapshot(RpcController controller, SnapshotRequest request)
3050       throws ServiceException {
3051     try {
3052       this.snapshotManager.checkSnapshotSupport();
3053     } catch (UnsupportedOperationException e) {
3054       throw new ServiceException(e);
3055     }
3056 
3057     LOG.info(getClientIdAuditPrefix() + " snapshot request for:" +
3058         ClientSnapshotDescriptionUtils.toString(request.getSnapshot()));
3059     // get the snapshot information
3060     SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(request.getSnapshot(),
3061       this.conf);
3062     try {
3063       snapshotManager.takeSnapshot(snapshot);
3064     } catch (IOException e) {
3065       throw new ServiceException(e);
3066     }
3067 
3068     // send back the max amount of time the client should wait for the snapshot to complete
3069     long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(conf, snapshot.getType(),
3070       SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
3071     return SnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();
3072   }
3073 
3074   /**
3075    * List the currently available/stored snapshots. Any in-progress snapshots are ignored
3076    */
3077   @Override
3078   public GetCompletedSnapshotsResponse getCompletedSnapshots(RpcController controller,
3079       GetCompletedSnapshotsRequest request) throws ServiceException {
3080     try {
3081       GetCompletedSnapshotsResponse.Builder builder = GetCompletedSnapshotsResponse.newBuilder();
3082       List<SnapshotDescription> snapshots = snapshotManager.getCompletedSnapshots();
3083 
3084       // convert to protobuf
3085       for (SnapshotDescription snapshot : snapshots) {
3086         builder.addSnapshots(snapshot);
3087       }
3088       return builder.build();
3089     } catch (IOException e) {
3090       throw new ServiceException(e);
3091     }
3092   }
3093 
3094   /**
3095    * Execute Delete Snapshot operation.
3096    * @return DeleteSnapshotResponse (a protobuf wrapped void) if the snapshot existed and was
3097    *    deleted properly.
3098    * @throws ServiceException wrapping SnapshotDoesNotExistException if specified snapshot did not
3099    *    exist.
3100    */
3101   @Override
3102   public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
3103       DeleteSnapshotRequest request) throws ServiceException {
3104     try {
3105       this.snapshotManager.checkSnapshotSupport();
3106     } catch (UnsupportedOperationException e) {
3107       throw new ServiceException(e);
3108     }
3109 
3110     try {
3111       LOG.info(getClientIdAuditPrefix() + " delete " + request.getSnapshot());
3112       snapshotManager.deleteSnapshot(request.getSnapshot());
3113       return DeleteSnapshotResponse.newBuilder().build();
3114     } catch (IOException e) {
3115       throw new ServiceException(e);
3116     }
3117   }
3118 
3119   /**
3120    * Checks if the specified snapshot is done.
3121    * @return true if the snapshot is in file system ready to use,
3122    *   false if the snapshot is in the process of completing
3123    * @throws ServiceException wrapping UnknownSnapshotException if invalid snapshot, or
3124    *  a wrapped HBaseSnapshotException with progress failure reason.
3125    */
3126   @Override
3127   public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
3128       IsSnapshotDoneRequest request) throws ServiceException {
3129     LOG.debug("Checking to see if snapshot from request:" +
3130         ClientSnapshotDescriptionUtils.toString(request.getSnapshot()) + " is done");
3131     try {
3132       IsSnapshotDoneResponse.Builder builder = IsSnapshotDoneResponse.newBuilder();
3133       boolean done = snapshotManager.isSnapshotDone(request.getSnapshot());
3134       builder.setDone(done);
3135       return builder.build();
3136     } catch (IOException e) {
3137       throw new ServiceException(e);
3138     }
3139   }
3140 
3141   /**
3142    * Execute Restore/Clone snapshot operation.
3143    *
3144    * <p>If the specified table exists a "Restore" is executed, replacing the table
3145    * schema and directory data with the content of the snapshot.
3146    * The table must be disabled, or a UnsupportedOperationException will be thrown.
3147    *
3148    * <p>If the table doesn't exist a "Clone" is executed, a new table is created
3149    * using the schema at the time of the snapshot, and the content of the snapshot.
3150    *
3151    * <p>The restore/clone operation does not require copying HFiles. Since HFiles
3152    * are immutable the table can point to and use the same files as the original one.
3153    */
3154   @Override
3155   public RestoreSnapshotResponse restoreSnapshot(RpcController controller,
3156       RestoreSnapshotRequest request) throws ServiceException {
3157     try {
3158       this.snapshotManager.checkSnapshotSupport();
3159     } catch (UnsupportedOperationException e) {
3160       throw new ServiceException(e);
3161     }
3162 
3163     // ensure namespace exists
3164     try {
3165       TableName dstTable = TableName.valueOf(request.getSnapshot().getTable());
3166       getNamespaceDescriptor(dstTable.getNamespaceAsString());
3167     } catch (IOException ioe) {
3168       throw new ServiceException(ioe);
3169     }
3170 
3171     try {
3172       SnapshotDescription reqSnapshot = request.getSnapshot();
3173       snapshotManager.restoreSnapshot(reqSnapshot);
3174       return RestoreSnapshotResponse.newBuilder().build();
3175     } catch (IOException e) {
3176       throw new ServiceException(e);
3177     }
3178   }
3179 
3180   /**
3181    * Returns the status of the requested snapshot restore/clone operation.
3182    * This method is not exposed to the user, it is just used internally by HBaseAdmin
3183    * to verify if the restore is completed.
3184    *
3185    * No exceptions are thrown if the restore is not running, the result will be "done".
3186    *
3187    * @return done <tt>true</tt> if the restore/clone operation is completed.
3188    * @throws ServiceException if the operation failed.
3189    */
3190   @Override
3191   public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(RpcController controller,
3192       IsRestoreSnapshotDoneRequest request) throws ServiceException {
3193     try {
3194       SnapshotDescription snapshot = request.getSnapshot();
3195       IsRestoreSnapshotDoneResponse.Builder builder = IsRestoreSnapshotDoneResponse.newBuilder();
3196       boolean done = snapshotManager.isRestoreDone(snapshot);
3197       builder.setDone(done);
3198       return builder.build();
3199     } catch (IOException e) {
3200       throw new ServiceException(e);
3201     }
3202   }
3203 
3204   /**
3205    * Triggers an asynchronous attempt to run a distributed procedure.
3206    * {@inheritDoc}
3207    */
3208   @Override
3209   public ExecProcedureResponse execProcedure(RpcController controller,
3210       ExecProcedureRequest request) throws ServiceException {
3211     ProcedureDescription desc = request.getProcedure();
3212     MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
3213         .getSignature());
3214     if (mpm == null) {
3215       throw new ServiceException("The procedure is not registered: "
3216           + desc.getSignature());
3217     }
3218 
3219     LOG.info(getClientIdAuditPrefix() + " procedure request for: "
3220         + desc.getSignature());
3221 
3222     try {
3223       mpm.execProcedure(desc);
3224     } catch (IOException e) {
3225       throw new ServiceException(e);
3226     }
3227 
3228     // send back the max amount of time the client should wait for the procedure
3229     // to complete
3230     long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
3231     return ExecProcedureResponse.newBuilder().setExpectedTimeout(waitTime)
3232         .build();
3233   }
3234 
3235   /**
3236    * Checks if the specified procedure is done.
3237    * @return true if the procedure is done,
3238    *   false if the procedure is in the process of completing
3239    * @throws ServiceException if invalid procedure, or
3240    *  a failed procedure with progress failure reason.
3241    */
3242   @Override
3243   public IsProcedureDoneResponse isProcedureDone(RpcController controller,
3244       IsProcedureDoneRequest request) throws ServiceException {
3245     ProcedureDescription desc = request.getProcedure();
3246     MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
3247         .getSignature());
3248     if (mpm == null) {
3249       throw new ServiceException("The procedure is not registered: "
3250           + desc.getSignature());
3251     }
3252     LOG.debug("Checking to see if procedure from request:"
3253         + desc.getSignature() + " is done");
3254 
3255     try {
3256       IsProcedureDoneResponse.Builder builder = IsProcedureDoneResponse
3257           .newBuilder();
3258       boolean done = mpm.isProcedureDone(desc);
3259       builder.setDone(done);
3260       return builder.build();
3261     } catch (IOException e) {
3262       throw new ServiceException(e);
3263     }
3264   }
3265 
3266   @Override
3267   public ModifyNamespaceResponse modifyNamespace(RpcController controller,
3268       ModifyNamespaceRequest request) throws ServiceException {
3269     try {
3270       modifyNamespace(ProtobufUtil.toNamespaceDescriptor(request.getNamespaceDescriptor()));
3271       return ModifyNamespaceResponse.getDefaultInstance();
3272     } catch (IOException e) {
3273       throw new ServiceException(e);
3274     }
3275   }
3276 
3277   @Override
3278   public CreateNamespaceResponse createNamespace(RpcController controller,
3279      CreateNamespaceRequest request) throws ServiceException {
3280     try {
3281       createNamespace(ProtobufUtil.toNamespaceDescriptor(request.getNamespaceDescriptor()));
3282       return CreateNamespaceResponse.getDefaultInstance();
3283     } catch (IOException e) {
3284       throw new ServiceException(e);
3285     }
3286   }
3287 
3288   @Override
3289   public DeleteNamespaceResponse deleteNamespace(RpcController controller,
3290       DeleteNamespaceRequest request) throws ServiceException {
3291     try {
3292       deleteNamespace(request.getNamespaceName());
3293       return DeleteNamespaceResponse.getDefaultInstance();
3294     } catch (IOException e) {
3295       throw new ServiceException(e);
3296     }
3297   }
3298 
3299   @Override
3300   public GetNamespaceDescriptorResponse getNamespaceDescriptor(
3301       RpcController controller, GetNamespaceDescriptorRequest request)
3302       throws ServiceException {
3303     try {
3304       return GetNamespaceDescriptorResponse.newBuilder()
3305           .setNamespaceDescriptor(
3306               ProtobufUtil.toProtoNamespaceDescriptor(getNamespaceDescriptor(request.getNamespaceName())))
3307           .build();
3308     } catch (IOException e) {
3309       throw new ServiceException(e);
3310     }
3311   }
3312 
3313   @Override
3314   public ListNamespaceDescriptorsResponse listNamespaceDescriptors(
3315       RpcController controller, ListNamespaceDescriptorsRequest request)
3316       throws ServiceException {
3317     try {
3318       ListNamespaceDescriptorsResponse.Builder response =
3319           ListNamespaceDescriptorsResponse.newBuilder();
3320       for(NamespaceDescriptor ns: listNamespaceDescriptors()) {
3321         response.addNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(ns));
3322       }
3323       return response.build();
3324     } catch (IOException e) {
3325       throw new ServiceException(e);
3326     }
3327   }
3328 
3329   @Override
3330   public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
3331       RpcController controller, ListTableDescriptorsByNamespaceRequest request)
3332       throws ServiceException {
3333     try {
3334       ListTableDescriptorsByNamespaceResponse.Builder b =
3335           ListTableDescriptorsByNamespaceResponse.newBuilder();
3336       for(HTableDescriptor htd: listTableDescriptorsByNamespace(request.getNamespaceName())) {
3337         b.addTableSchema(htd.convert());
3338       }
3339       return b.build();
3340     } catch (IOException e) {
3341       throw new ServiceException(e);
3342     }
3343   }
3344 
3345   @Override
3346   public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
3347       RpcController controller, ListTableNamesByNamespaceRequest request)
3348       throws ServiceException {
3349     try {
3350       ListTableNamesByNamespaceResponse.Builder b =
3351           ListTableNamesByNamespaceResponse.newBuilder();
3352       for (TableName tableName: listTableNamesByNamespace(request.getNamespaceName())) {
3353         b.addTableName(ProtobufUtil.toProtoTableName(tableName));
3354       }
3355       return b.build();
3356     } catch (IOException e) {
3357       throw new ServiceException(e);
3358     }
3359   }
3360 
3361   private boolean isHealthCheckerConfigured() {
3362     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3363     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3364   }
3365 
3366   @Override
3367   public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
3368     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
3369     if (cpHost != null) {
3370       if (cpHost.preCreateNamespace(descriptor)) {
3371         return;
3372       }
3373     }
3374     LOG.info(getClientIdAuditPrefix() + " creating " + descriptor);
3375     tableNamespaceManager.create(descriptor);
3376     if (cpHost != null) {
3377       cpHost.postCreateNamespace(descriptor);
3378     }
3379   }
3380 
3381   @Override
3382   public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
3383     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
3384     if (cpHost != null) {
3385       if (cpHost.preModifyNamespace(descriptor)) {
3386         return;
3387       }
3388     }
3389     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
3390     tableNamespaceManager.update(descriptor);
3391     if (cpHost != null) {
3392       cpHost.postModifyNamespace(descriptor);
3393     }
3394   }
3395 
3396   @Override
3397   public void deleteNamespace(String name) throws IOException {
3398     if (cpHost != null) {
3399       if (cpHost.preDeleteNamespace(name)) {
3400         return;
3401       }
3402     }
3403     LOG.info(getClientIdAuditPrefix() + " delete " + name);
3404     tableNamespaceManager.remove(name);
3405     if (cpHost != null) {
3406       cpHost.postDeleteNamespace(name);
3407     }
3408   }
3409 
3410   @Override
3411   public NamespaceDescriptor getNamespaceDescriptor(String name) throws IOException {
3412     boolean ready = tableNamespaceManager != null &&
3413         tableNamespaceManager.isTableAvailableAndInitialized();
3414     if (!ready) {
3415       throw new IOException("Table Namespace Manager not ready yet, try again later");
3416     }
3417     NamespaceDescriptor nsd = tableNamespaceManager.get(name);
3418     if (nsd == null) {
3419       throw new NamespaceNotFoundException(name);
3420     }
3421     return nsd;
3422   }
3423 
3424   @Override
3425   public List<NamespaceDescriptor> listNamespaceDescriptors() throws IOException {
3426     return Lists.newArrayList(tableNamespaceManager.list());
3427   }
3428 
3429   @Override
3430   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
3431     getNamespaceDescriptor(name); // check that namespace exists
3432     return Lists.newArrayList(tableDescriptors.getByNamespace(name).values());
3433   }
3434 
3435   @Override
3436   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
3437     List<TableName> tableNames = Lists.newArrayList();
3438     getNamespaceDescriptor(name); // check that namespace exists
3439     for (HTableDescriptor descriptor: tableDescriptors.getByNamespace(name).values()) {
3440       tableNames.add(descriptor.getTableName());
3441     }
3442     return tableNames;
3443   }
3444 
3445   @Override
3446   public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController controller,
3447       ReportRegionStateTransitionRequest req) throws ServiceException {
3448     try {
3449       RegionStateTransition rt = req.getTransition(0);
3450       TableName tableName = ProtobufUtil.toTableName(
3451         rt.getRegionInfo(0).getTableName());
3452       RegionStates regionStates = assignmentManager.getRegionStates();
3453       if (!(TableName.META_TABLE_NAME.equals(tableName)
3454           && regionStates.getRegionState(HRegionInfo.FIRST_META_REGIONINFO) != null)
3455           && !assignmentManager.isFailoverCleanupDone()) {
3456         // Meta region is assigned before master finishes the
3457         // failover cleanup. So no need this check for it
3458         throw new PleaseHoldException("Master is rebuilding user regions");
3459       }
3460       ServerName sn = ProtobufUtil.toServerName(req.getServer());
3461       String error = assignmentManager.onRegionTransition(sn, rt);
3462       ReportRegionStateTransitionResponse.Builder rrtr =
3463         ReportRegionStateTransitionResponse.newBuilder();
3464       if (error != null) {
3465         rrtr.setErrorMessage(error);
3466       }
3467       return rrtr.build();
3468     } catch (IOException ioe) {
3469       throw new ServiceException(ioe);
3470     }
3471   }
3472 
3473   @Override
3474   public void truncateTable(TableName tableName, boolean preserveSplits) throws IOException {
3475     checkInitialized();
3476     if (cpHost != null) {
3477       cpHost.preTruncateTable(tableName);
3478     }
3479     LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
3480     TruncateTableHandler handler = new TruncateTableHandler(tableName, this, this, preserveSplits);
3481     handler.prepare();
3482     handler.process();
3483     if (cpHost != null) {
3484       cpHost.postTruncateTable(tableName);
3485     }
3486   }
3487 
3488 
3489   @Override
3490   public TruncateTableResponse truncateTable(RpcController controller, TruncateTableRequest request)
3491       throws ServiceException {
3492     try {
3493       truncateTable(ProtobufUtil.toTableName(request.getTableName()), request.getPreserveSplits());
3494     } catch (IOException e) {
3495       throw new ServiceException(e);
3496     }
3497     return TruncateTableResponse.newBuilder().build();
3498   }
3499 
3500 }