View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.lang.reflect.Constructor;
25  import java.lang.reflect.UndeclaredThrowableException;
26  import java.net.SocketException;
27  import java.util.ArrayList;
28  import java.util.Date;
29  import java.util.HashSet;
30  import java.util.LinkedHashMap;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Map.Entry;
34  import java.util.NavigableMap;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.ConcurrentMap;
38  import java.util.concurrent.ConcurrentSkipListMap;
39  import java.util.concurrent.ConcurrentSkipListSet;
40  import java.util.concurrent.CopyOnWriteArraySet;
41  import java.util.concurrent.ExecutorService;
42  import java.util.concurrent.LinkedBlockingQueue;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicInteger;
47  
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.classification.InterfaceStability;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.hbase.Chore;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HConstants;
56  import org.apache.hadoop.hbase.HRegionInfo;
57  import org.apache.hadoop.hbase.HRegionLocation;
58  import org.apache.hadoop.hbase.HTableDescriptor;
59  import org.apache.hadoop.hbase.MasterNotRunningException;
60  import org.apache.hadoop.hbase.RegionTooBusyException;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.Stoppable;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotEnabledException;
65  import org.apache.hadoop.hbase.TableNotFoundException;
66  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
67  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
68  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
69  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
70  import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
71  import org.apache.hadoop.hbase.client.coprocessor.Batch;
72  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
73  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
74  import org.apache.hadoop.hbase.ipc.RpcClient;
75  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
76  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
77  import org.apache.hadoop.hbase.protobuf.RequestConverter;
78  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
79  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
80  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
81  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
82  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
83  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
84  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
85  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
86  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
87  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
88  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
89  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
90  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
91  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
92  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
93  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
94  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
95  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
96  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
97  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
98  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
165 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
166 import org.apache.hadoop.hbase.security.User;
167 import org.apache.hadoop.hbase.security.UserProvider;
168 import org.apache.hadoop.hbase.util.Bytes;
169 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
170 import org.apache.hadoop.hbase.util.ExceptionUtil;
171 import org.apache.hadoop.hbase.util.Threads;
172 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
173 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
174 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
175 import org.apache.hadoop.ipc.RemoteException;
176 import org.apache.zookeeper.KeeperException;
177 
178 import com.google.common.annotations.VisibleForTesting;
179 import com.google.protobuf.BlockingRpcChannel;
180 import com.google.protobuf.RpcController;
181 import com.google.protobuf.ServiceException;
182 
183 /**
184  * A non-instantiable class that manages creation of {@link HConnection}s.
185  * <p>The simplest way to use this class is by using {@link #createConnection(Configuration)}.
186  * This creates a new {@link HConnection} to the cluster that is managed by the caller.
187  * From this {@link HConnection} {@link HTableInterface} implementations are retrieved
188  * with {@link HConnection#getTable(byte[])}. Example:
189  * <pre>
190  * {@code
191  * HConnection connection = HConnectionManager.createConnection(config);
192  * HTableInterface table = connection.getTable("table1");
193  * try {
194  *   // Use the table as needed, for a single operation and a single thread
195  * } finally {
196  *   table.close();
197  *   connection.close();
198  * }
199  * }</pre>
200  * <p>This class has a static Map of {@link HConnection} instances keyed by
201  * {@link HConnectionKey}; A {@link HConnectionKey} is identified by a set of
202  * {@link Configuration} properties. Invocations of {@link #getConnection(Configuration)}
203  * that pass the same {@link Configuration} instance will return the same
204  * {@link  HConnection} instance ONLY WHEN the set of properties are the same
205  * (i.e. if you change properties in your {@link Configuration} instance, such as RPC timeout,
206  * the codec used, HBase will create a new {@link HConnection} instance. For more details on
207  * how this is done see {@link HConnectionKey}).
208  * <p>Sharing {@link HConnection} instances is usually what you want; all clients
209  * of the {@link HConnection} instances share the HConnections' cache of Region
210  * locations rather than each having to discover for itself the location of meta, etc.
211  * But sharing connections makes clean up of {@link HConnection} instances a little awkward.
212  * Currently, clients cleanup by calling {@link #deleteConnection(Configuration)}. This will
213  * shutdown the zookeeper connection the HConnection was using and clean up all
214  * HConnection resources as well as stopping proxies to servers out on the
215  * cluster. Not running the cleanup will not end the world; it'll
216  * just stall the closeup some and spew some zookeeper connection failed
217  * messages into the log.  Running the cleanup on a {@link HConnection} that is
218  * subsequently used by another will cause breakage so be careful running
219  * cleanup.
220  * <p>To create a {@link HConnection} that is not shared by others, you can
221  * set property "hbase.client.instance.id" to a unique value for your {@link Configuration}
222  * instance, like the following:
223  * <pre>
224  * {@code
225  * conf.set("hbase.client.instance.id", "12345");
226  * HConnection connection = HConnectionManager.getConnection(conf);
227  * // Use the connection to your hearts' delight and then when done...
228  * conf.set("hbase.client.instance.id", "12345");
229  * HConnectionManager.deleteConnection(conf, true);
230  * }
231  * </pre>
232  * <p>Cleanup used to be done inside in a shutdown hook.  On startup we'd
233  * register a shutdown hook that called {@link #deleteAllConnections()}
234  * on its way out but the order in which shutdown hooks run is not defined so
235  * were problematic for clients of HConnection that wanted to register their
236  * own shutdown hooks so we removed ours though this shifts the onus for
237  * cleanup to the client.
238  */
239 @SuppressWarnings("serial")
240 @InterfaceAudience.Public
241 @InterfaceStability.Evolving
242 public class HConnectionManager {
243   static final Log LOG = LogFactory.getLog(HConnectionManager.class);
244 
245   public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
246   private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
247 
248   // An LRU Map of HConnectionKey -> HConnection (TableServer).  All
249   // access must be synchronized.  This map is not private because tests
250   // need to be able to tinker with it.
251   static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
252 
253   public static final int MAX_CACHED_CONNECTION_INSTANCES;
254 
255   /**
256    * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
257    * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
258    */
259   private static volatile NonceGenerator nonceGenerator = null;
260   /** The nonce generator lock. Only taken when creating HConnection, which gets a private copy. */
261   private static Object nonceGeneratorCreateLock = new Object();
262 
263   static {
264     // We set instances to one more than the value specified for {@link
265     // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
266     // connections to the ensemble from the one client is 30, so in that case we
267     // should run into zk issues before the LRU hit this value of 31.
268     MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
269       HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
270     CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
271         (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
272       @Override
273       protected boolean removeEldestEntry(
274           Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
275          return size() > MAX_CACHED_CONNECTION_INSTANCES;
276        }
277     };
278   }
279 
280   /*
281    * Non-instantiable.
282    */
283   private HConnectionManager() {
284     super();
285   }
286 
287   /**
288    * @param conn The connection for which to replace the generator.
289    * @param cnm Replaces the nonce generator used, for testing.
290    * @return old nonce generator.
291    */
292   @VisibleForTesting
293   public static NonceGenerator injectNonceGeneratorForTesting(
294       HConnection conn, NonceGenerator cnm) {
295     NonceGenerator ng = conn.getNonceGenerator();
296     LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
297     ((HConnectionImplementation)conn).nonceGenerator = cnm;
298     return ng;
299   }
300 
301   /**
302    * Get the connection that goes with the passed <code>conf</code> configuration instance.
303    * If no current connection exists, method creates a new connection and keys it using
304    * connection-specific properties from the passed {@link Configuration}; see
305    * {@link HConnectionKey}.
306    * @param conf configuration
307    * @return HConnection object for <code>conf</code>
308    * @throws ZooKeeperConnectionException
309    */
310   @Deprecated
311   public static HConnection getConnection(final Configuration conf)
312   throws IOException {
313     HConnectionKey connectionKey = new HConnectionKey(conf);
314     synchronized (CONNECTION_INSTANCES) {
315       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
316       if (connection == null) {
317         connection = (HConnectionImplementation)createConnection(conf, true);
318         CONNECTION_INSTANCES.put(connectionKey, connection);
319       } else if (connection.isClosed()) {
320         HConnectionManager.deleteConnection(connectionKey, true);
321         connection = (HConnectionImplementation)createConnection(conf, true);
322         CONNECTION_INSTANCES.put(connectionKey, connection);
323       }
324       connection.incCount();
325       return connection;
326     }
327   }
328 
329   /**
330    * Create a new HConnection instance using the passed <code>conf</code> instance.
331    * <p>Note: This bypasses the usual HConnection life cycle management done by
332    * {@link #getConnection(Configuration)}. The caller is responsible for
333    * calling {@link HConnection#close()} on the returned connection instance.
334    *
335    * This is the recommended way to create HConnections.
336    * {@code
337    * HConnection connection = HConnectionManager.createConnection(conf);
338    * HTableInterface table = connection.getTable("mytable");
339    * table.get(...);
340    * ...
341    * table.close();
342    * connection.close();
343    * }
344    *
345    * @param conf configuration
346    * @return HConnection object for <code>conf</code>
347    * @throws ZooKeeperConnectionException
348    */
349   public static HConnection createConnection(Configuration conf)
350   throws IOException {
351     UserProvider provider = UserProvider.instantiate(conf);
352     return createConnection(conf, false, null, provider.getCurrent());
353   }
354 
355   /**
356    * Create a new HConnection instance using the passed <code>conf</code> instance.
357    * <p>Note: This bypasses the usual HConnection life cycle management done by
358    * {@link #getConnection(Configuration)}. The caller is responsible for
359    * calling {@link HConnection#close()} on the returned connection instance.
360    * This is the recommended way to create HConnections.
361    * {@code
362    * ExecutorService pool = ...;
363    * HConnection connection = HConnectionManager.createConnection(conf, pool);
364    * HTableInterface table = connection.getTable("mytable");
365    * table.get(...);
366    * ...
367    * table.close();
368    * connection.close();
369    * }
370    * @param conf configuration
371    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
372    * @return HConnection object for <code>conf</code>
373    * @throws ZooKeeperConnectionException
374    */
375   public static HConnection createConnection(Configuration conf, ExecutorService pool)
376   throws IOException {
377     UserProvider provider = UserProvider.instantiate(conf);
378     return createConnection(conf, false, pool, provider.getCurrent());
379   }
380 
381   /**
382    * Create a new HConnection instance using the passed <code>conf</code> instance.
383    * <p>Note: This bypasses the usual HConnection life cycle management done by
384    * {@link #getConnection(Configuration)}. The caller is responsible for
385    * calling {@link HConnection#close()} on the returned connection instance.
386    * This is the recommended way to create HConnections.
387    * {@code
388    * ExecutorService pool = ...;
389    * HConnection connection = HConnectionManager.createConnection(conf, pool);
390    * HTableInterface table = connection.getTable("mytable");
391    * table.get(...);
392    * ...
393    * table.close();
394    * connection.close();
395    * }
396    * @param conf configuration
397    * @param user the user the connection is for
398    * @return HConnection object for <code>conf</code>
399    * @throws ZooKeeperConnectionException
400    */
401   public static HConnection createConnection(Configuration conf, User user)
402   throws IOException {
403     return createConnection(conf, false, null, user);
404   }
405 
406   /**
407    * Create a new HConnection instance using the passed <code>conf</code> instance.
408    * <p>Note: This bypasses the usual HConnection life cycle management done by
409    * {@link #getConnection(Configuration)}. The caller is responsible for
410    * calling {@link HConnection#close()} on the returned connection instance.
411    * This is the recommended way to create HConnections.
412    * {@code
413    * ExecutorService pool = ...;
414    * HConnection connection = HConnectionManager.createConnection(conf, pool);
415    * HTableInterface table = connection.getTable("mytable");
416    * table.get(...);
417    * ...
418    * table.close();
419    * connection.close();
420    * }
421    * @param conf configuration
422    * @param pool the thread pool to use for batch operation in HTables used via this HConnection
423    * @param user the user the connection is for
424    * @return HConnection object for <code>conf</code>
425    * @throws ZooKeeperConnectionException
426    */
427   public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
428   throws IOException {
429     return createConnection(conf, false, pool, user);
430   }
431 
432   @Deprecated
433   static HConnection createConnection(final Configuration conf, final boolean managed)
434       throws IOException {
435     UserProvider provider = UserProvider.instantiate(conf);
436     return createConnection(conf, managed, null, provider.getCurrent());
437   }
438 
439   @Deprecated
440   static HConnection createConnection(final Configuration conf, final boolean managed,
441       final ExecutorService pool, final User user)
442   throws IOException {
443     String className = conf.get("hbase.client.connection.impl",
444       HConnectionManager.HConnectionImplementation.class.getName());
445     Class<?> clazz = null;
446     try {
447       clazz = Class.forName(className);
448     } catch (ClassNotFoundException e) {
449       throw new IOException(e);
450     }
451     try {
452       // Default HCM#HCI is not accessible; make it so before invoking.
453       Constructor<?> constructor =
454         clazz.getDeclaredConstructor(Configuration.class,
455           boolean.class, ExecutorService.class, User.class);
456       constructor.setAccessible(true);
457       return (HConnection) constructor.newInstance(conf, managed, pool, user);
458     } catch (Exception e) {
459       throw new IOException(e);
460     }
461   }
462 
463   /**
464    * Delete connection information for the instance specified by passed configuration.
465    * If there are no more references to the designated connection connection, this method will
466    * then close connection to the zookeeper ensemble and let go of all associated resources.
467    *
468    * @param conf configuration whose identity is used to find {@link HConnection} instance.
469    * @deprecated
470    */
471   public static void deleteConnection(Configuration conf) {
472     deleteConnection(new HConnectionKey(conf), false);
473   }
474 
475   /**
476    * Cleanup a known stale connection.
477    * This will then close connection to the zookeeper ensemble and let go of all resources.
478    *
479    * @param connection
480    * @deprecated
481    */
482   public static void deleteStaleConnection(HConnection connection) {
483     deleteConnection(connection, true);
484   }
485 
486   /**
487    * Delete information for all connections. Close or not the connection, depending on the
488    *  staleConnection boolean and the ref count. By default, you should use it with
489    *  staleConnection to true.
490    * @deprecated
491    */
492   public static void deleteAllConnections(boolean staleConnection) {
493     synchronized (CONNECTION_INSTANCES) {
494       Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
495       connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
496       for (HConnectionKey connectionKey : connectionKeys) {
497         deleteConnection(connectionKey, staleConnection);
498       }
499       CONNECTION_INSTANCES.clear();
500     }
501   }
502 
503   /**
504    * Delete information for all connections..
505    * @deprecated kept for backward compatibility, but the behavior is broken. HBASE-8983
506    */
507   @Deprecated
508   public static void deleteAllConnections() {
509     deleteAllConnections(false);
510   }
511 
512 
513   @Deprecated
514   private static void deleteConnection(HConnection connection, boolean staleConnection) {
515     synchronized (CONNECTION_INSTANCES) {
516       for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
517         if (e.getValue() == connection) {
518           deleteConnection(e.getKey(), staleConnection);
519           break;
520         }
521       }
522     }
523   }
524 
525   @Deprecated
526   private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
527     synchronized (CONNECTION_INSTANCES) {
528       HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
529       if (connection != null) {
530         connection.decCount();
531         if (connection.isZeroReference() || staleConnection) {
532           CONNECTION_INSTANCES.remove(connectionKey);
533           connection.internalClose();
534         }
535       } else {
536         LOG.error("Connection not found in the list, can't delete it "+
537           "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
538       }
539     }
540   }
541 
542   /**
543    * It is provided for unit test cases which verify the behavior of region
544    * location cache prefetch.
545    * @return Number of cached regions for the table.
546    * @throws ZooKeeperConnectionException
547    */
548   static int getCachedRegionCount(Configuration conf, final TableName tableName)
549   throws IOException {
550     return execute(new HConnectable<Integer>(conf) {
551       @Override
552       public Integer connect(HConnection connection) {
553         return ((HConnectionImplementation)connection).getNumberOfCachedRegionLocations(tableName);
554       }
555     });
556   }
557 
558   /**
559    * This convenience method invokes the given {@link HConnectable#connect}
560    * implementation using a {@link HConnection} instance that lasts just for the
561    * duration of the invocation.
562    *
563    * @param <T> the return type of the connect method
564    * @param connectable the {@link HConnectable} instance
565    * @return the value returned by the connect method
566    * @throws IOException
567    */
568   @InterfaceAudience.Private
569   public static <T> T execute(HConnectable<T> connectable) throws IOException {
570     if (connectable == null || connectable.conf == null) {
571       return null;
572     }
573     Configuration conf = connectable.conf;
574     HConnection connection = HConnectionManager.getConnection(conf);
575     boolean connectSucceeded = false;
576     try {
577       T returnValue = connectable.connect(connection);
578       connectSucceeded = true;
579       return returnValue;
580     } finally {
581       try {
582         connection.close();
583       } catch (Exception e) {
584         ExceptionUtil.rethrowIfInterrupt(e);
585         if (connectSucceeded) {
586           throw new IOException("The connection to " + connection
587               + " could not be deleted.", e);
588         }
589       }
590     }
591   }
592 
593   /** Encapsulates connection to zookeeper and regionservers.*/
594   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
595       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
596       justification="Access to the conncurrent hash map is under a lock so should be fine.")
597   public static class HConnectionImplementation implements HConnection, Closeable {
598     static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
599     private final long pause;
600     private final int numTries;
601     final int rpcTimeout;
602     private NonceGenerator nonceGenerator = null;
603     private final boolean usePrefetch;
604     private final int prefetchRegionLimit;
605 
606     private volatile boolean closed;
607     private volatile boolean aborted;
608 
609     // package protected for the tests
610     ClusterStatusListener clusterStatusListener;
611 
612     private final Object userRegionLock = new Object();
613 
614     // We have a single lock for master & zk to prevent deadlocks. Having
615     //  one lock for ZK and one lock for master is not possible:
616     //  When creating a connection to master, we need a connection to ZK to get
617     //  its address. But another thread could have taken the ZK lock, and could
618     //  be waiting for the master lock => deadlock.
619     private final Object masterAndZKLock = new Object();
620 
621     private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
622     private final DelayedClosing delayedClosing =
623       DelayedClosing.createAndStart(this);
624 
625     // thread executor shared by all HTableInterface instances created
626     // by this connection
627     private volatile ExecutorService batchPool = null;
628     private volatile boolean cleanupPool = false;
629 
630     private final Configuration conf;
631 
632     // cache the configuration value for tables so that we can avoid calling
633     // the expensive Configuration to fetch the value multiple times.
634     private final TableConfiguration tableConfig;
635 
636     // Client rpc instance.
637     private RpcClient rpcClient;
638 
639     /**
640       * Map of table to table {@link HRegionLocation}s.
641       */
642     private final ConcurrentMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>
643         cachedRegionLocations =
644       new ConcurrentHashMap<TableName, ConcurrentSkipListMap<byte[], HRegionLocation>>();
645 
646     // The presence of a server in the map implies it's likely that there is an
647     // entry in cachedRegionLocations that map to this server; but the absence
648     // of a server in this map guarentees that there is no entry in cache that
649     // maps to the absent server.
650     // The access to this attribute must be protected by a lock on cachedRegionLocations
651     private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>();
652 
653     // region cache prefetch is enabled by default. this set contains all
654     // tables whose region cache prefetch are disabled.
655     private final Set<Integer> regionCachePrefetchDisabledTables =
656       new CopyOnWriteArraySet<Integer>();
657 
658     private int refCount;
659 
660     // indicates whether this connection's life cycle is managed (by us)
661     private boolean managed;
662 
663     private User user;
664 
665     private RpcRetryingCallerFactory rpcCallerFactory;
666 
667     private RpcControllerFactory rpcControllerFactory;
668 
669     // single tracker per connection
670     private final ServerStatisticTracker stats;
671 
672     private final ClientBackoffPolicy backoffPolicy;
673 
674     /**
675      * Cluster registry of basic info such as clusterid and meta region location.
676      */
677      Registry registry;
678 
679      HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
680        this(conf, managed, null, null);
681      }
682 
683     /**
684      * constructor
685      * @param conf Configuration object
686      * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
687      * to zk and shutdown of all services; we just close down the resources this connection was
688      * responsible for and decrement usage counters.  It is up to the caller to do the full
689      * cleanup.  It is set when we want have connection sharing going on -- reuse of zk connection,
690      * and cached region locations, established regionserver connections, etc.  When connections
691      * are shared, we have reference counting going on and will only do full cleanup when no more
692      * users of an HConnectionImplementation instance.
693      */
694     HConnectionImplementation(Configuration conf, boolean managed,
695         ExecutorService pool, User user) throws IOException {
696       this(conf);
697       this.user = user;
698       this.batchPool = pool;
699       this.managed = managed;
700       this.registry = setupRegistry();
701       retrieveClusterId();
702 
703       this.rpcClient = new RpcClient(this.conf, this.clusterId);
704 
705       // Do we publish the status?
706       boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
707           HConstants.STATUS_PUBLISHED_DEFAULT);
708       Class<? extends ClusterStatusListener.Listener> listenerClass =
709           conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
710               ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
711               ClusterStatusListener.Listener.class);
712       if (shouldListen) {
713         if (listenerClass == null) {
714           LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
715               ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
716         } else {
717           clusterStatusListener = new ClusterStatusListener(
718               new ClusterStatusListener.DeadServerHandler() {
719                 @Override
720                 public void newDead(ServerName sn) {
721                   clearCaches(sn);
722                   rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
723                       new SocketException(sn.getServerName() +
724                           " is dead: closing its connection."));
725                 }
726               }, conf, listenerClass);
727         }
728       }
729 
730       this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats);
731       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
732     }
733 
734     /** Dummy nonce generator for disabled nonces. */
735     private static class NoNonceGenerator implements NonceGenerator {
736       @Override
737       public long getNonceGroup() {
738         return HConstants.NO_NONCE;
739       }
740       @Override
741       public long newNonce() {
742         return HConstants.NO_NONCE;
743       }
744     }
745 
746     /**
747      * For tests.
748      */
749     protected HConnectionImplementation(Configuration conf) {
750       this.conf = conf;
751       this.tableConfig = new TableConfiguration(conf);
752       this.closed = false;
753       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
754           HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
755       this.numTries = tableConfig.getRetriesNumber();
756       this.rpcTimeout = conf.getInt(
757           HConstants.HBASE_RPC_TIMEOUT_KEY,
758           HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
759       if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
760         synchronized (HConnectionManager.nonceGeneratorCreateLock) {
761           if (HConnectionManager.nonceGenerator == null) {
762             HConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
763           }
764           this.nonceGenerator = HConnectionManager.nonceGenerator;
765         }
766       } else {
767         this.nonceGenerator = new NoNonceGenerator();
768       }
769 
770       this.stats = ServerStatisticTracker.create(conf);
771       this.usePrefetch = conf.getBoolean(HConstants.HBASE_CLIENT_PREFETCH,
772           HConstants.DEFAULT_HBASE_CLIENT_PREFETCH);
773       this.prefetchRegionLimit = conf.getInt(
774           HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
775           HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
776       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
777       this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats);
778       this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
779       
780     }
781 
782     @Override
783     public HTableInterface getTable(String tableName) throws IOException {
784       return getTable(TableName.valueOf(tableName));
785     }
786 
787     @Override
788     public HTableInterface getTable(byte[] tableName) throws IOException {
789       return getTable(TableName.valueOf(tableName));
790     }
791 
792     @Override
793     public HTableInterface getTable(TableName tableName) throws IOException {
794       return getTable(tableName, getBatchPool());
795     }
796 
797     @Override
798     public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
799       return getTable(TableName.valueOf(tableName), pool);
800     }
801 
802     @Override
803     public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
804       return getTable(TableName.valueOf(tableName), pool);
805     }
806 
807     @Override
808     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
809       if (managed) {
810         throw new IOException("The connection has to be unmanaged.");
811       }
812       return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory,
813         pool);
814     }
815 
816     private ExecutorService getBatchPool() {
817       if (batchPool == null) {
818         // shared HTable thread executor not yet initialized
819         synchronized (this) {
820           if (batchPool == null) {
821             int maxThreads = conf.getInt("hbase.hconnection.threads.max", 256);
822             int coreThreads = conf.getInt("hbase.hconnection.threads.core", 256);
823             if (maxThreads == 0) {
824               maxThreads = Runtime.getRuntime().availableProcessors() * 8;
825             }
826             if (coreThreads == 0) {
827               coreThreads = Runtime.getRuntime().availableProcessors() * 8;
828             }
829             long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
830             LinkedBlockingQueue<Runnable> workQueue =
831               new LinkedBlockingQueue<Runnable>(maxThreads *
832                 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
833                   HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
834             ThreadPoolExecutor tpe = new ThreadPoolExecutor(
835                 coreThreads,
836                 maxThreads,
837                 keepAliveTime,
838                 TimeUnit.SECONDS,
839                 workQueue,
840                 Threads.newDaemonThreadFactory(toString() + "-shared-"));
841             tpe.allowCoreThreadTimeOut(true);
842             this.batchPool = tpe;
843           }
844           this.cleanupPool = true;
845         }
846       }
847       return this.batchPool;
848     }
849 
850     protected ExecutorService getCurrentBatchPool() {
851       return batchPool;
852     }
853 
854     private void shutdownBatchPool() {
855       if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
856         this.batchPool.shutdown();
857         try {
858           if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) {
859             this.batchPool.shutdownNow();
860           }
861         } catch (InterruptedException e) {
862           this.batchPool.shutdownNow();
863         }
864       }
865     }
866 
867     /**
868      * @return The cluster registry implementation to use.
869      * @throws IOException
870      */
871     private Registry setupRegistry() throws IOException {
872       String registryClass = this.conf.get("hbase.client.registry.impl",
873         ZooKeeperRegistry.class.getName());
874       Registry registry = null;
875       try {
876         registry = (Registry)Class.forName(registryClass).newInstance();
877       } catch (Throwable t) {
878         throw new IOException(t);
879       }
880       registry.init(this);
881       return registry;
882     }
883 
884     /**
885      * For tests only.
886      * @param rpcClient Client we should use instead.
887      * @return Previous rpcClient
888      */
889     RpcClient setRpcClient(final RpcClient rpcClient) {
890       RpcClient oldRpcClient = this.rpcClient;
891       this.rpcClient = rpcClient;
892       return oldRpcClient;
893     }
894 
895     /**
896      * An identifier that will remain the same for a given connection.
897      * @return
898      */
899     public String toString(){
900       return "hconnection-0x" + Integer.toHexString(hashCode());
901     }
902 
903     protected String clusterId = null;
904 
905     void retrieveClusterId() {
906       if (clusterId != null) return;
907       this.clusterId = this.registry.getClusterId();
908       if (clusterId == null) {
909         clusterId = HConstants.CLUSTER_ID_DEFAULT;
910         LOG.debug("clusterid came back null, using default " + clusterId);
911       }
912     }
913 
914     @Override
915     public Configuration getConfiguration() {
916       return this.conf;
917     }
918 
919     private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
920       throws MasterNotRunningException {
921       String errorMsg;
922       try {
923         if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
924           errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
925             + "It should have been written by the master. "
926             + "Check the value configured in 'zookeeper.znode.parent'. "
927             + "There could be a mismatch with the one configured in the master.";
928           LOG.error(errorMsg);
929           throw new MasterNotRunningException(errorMsg);
930         }
931       } catch (KeeperException e) {
932         errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
933         LOG.error(errorMsg);
934         throw new MasterNotRunningException(errorMsg, e);
935       }
936     }
937 
938     /**
939      * @return true if the master is running, throws an exception otherwise
940      * @throws MasterNotRunningException - if the master is not running
941      * @throws ZooKeeperConnectionException
942      */
943     @Override
944     public boolean isMasterRunning()
945     throws MasterNotRunningException, ZooKeeperConnectionException {
946       // When getting the master connection, we check it's running,
947       // so if there is no exception, it means we've been able to get a
948       // connection on a running master
949       MasterKeepAliveConnection m = getKeepAliveMasterService();
950       m.close();
951       return true;
952     }
953 
954     @Override
955     public HRegionLocation getRegionLocation(final TableName tableName,
956         final byte [] row, boolean reload)
957     throws IOException {
958       return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
959     }
960 
961     @Override
962     public HRegionLocation getRegionLocation(final byte[] tableName,
963         final byte [] row, boolean reload)
964     throws IOException {
965       return getRegionLocation(TableName.valueOf(tableName), row, reload);
966     }
967 
968     @Override
969     public boolean isTableEnabled(TableName tableName) throws IOException {
970       return this.registry.isTableOnlineState(tableName, true);
971     }
972 
973     @Override
974     public boolean isTableEnabled(byte[] tableName) throws IOException {
975       return isTableEnabled(TableName.valueOf(tableName));
976     }
977 
978     @Override
979     public boolean isTableDisabled(TableName tableName) throws IOException {
980       return this.registry.isTableOnlineState(tableName, false);
981     }
982 
983     @Override
984     public boolean isTableDisabled(byte[] tableName) throws IOException {
985       return isTableDisabled(TableName.valueOf(tableName));
986     }
987 
988     @Override
989     public boolean isTableAvailable(final TableName tableName) throws IOException {
990       final AtomicBoolean available = new AtomicBoolean(true);
991       final AtomicInteger regionCount = new AtomicInteger(0);
992       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
993         @Override
994         public boolean processRow(Result row) throws IOException {
995           HRegionInfo info = MetaScanner.getHRegionInfo(row);
996           if (info != null && !info.isSplitParent()) {
997             if (tableName.equals(info.getTable())) {
998               ServerName server = HRegionInfo.getServerName(row);
999               if (server == null) {
1000                 available.set(false);
1001                 return false;
1002               }
1003               regionCount.incrementAndGet();
1004             } else if (tableName.compareTo(info.getTable()) < 0) {
1005               // Return if we are done with the current table
1006               return false;
1007             }
1008           }
1009           return true;
1010         }
1011       };
1012       MetaScanner.metaScan(conf, this, visitor, tableName);
1013       return available.get() && (regionCount.get() > 0);
1014     }
1015 
1016     @Override
1017     public boolean isTableAvailable(final byte[] tableName) throws IOException {
1018       return isTableAvailable(TableName.valueOf(tableName));
1019     }
1020 
1021     @Override
1022     public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
1023         throws IOException {
1024       final AtomicBoolean available = new AtomicBoolean(true);
1025       final AtomicInteger regionCount = new AtomicInteger(0);
1026       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1027         @Override
1028         public boolean processRow(Result row) throws IOException {
1029           HRegionInfo info = MetaScanner.getHRegionInfo(row);
1030           if (info != null && !info.isSplitParent()) {
1031             if (tableName.equals(info.getTable())) {
1032               ServerName server = HRegionInfo.getServerName(row);
1033               if (server == null) {
1034                 available.set(false);
1035                 return false;
1036               }
1037               if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1038                 for (byte[] splitKey : splitKeys) {
1039                   // Just check if the splitkey is available
1040                   if (Bytes.equals(info.getStartKey(), splitKey)) {
1041                     regionCount.incrementAndGet();
1042                     break;
1043                   }
1044                 }
1045               } else {
1046                 // Always empty start row should be counted
1047                 regionCount.incrementAndGet();
1048               }
1049             } else if (tableName.compareTo(info.getTable()) < 0) {
1050               // Return if we are done with the current table
1051               return false;
1052             }
1053           }
1054           return true;
1055         }
1056       };
1057       MetaScanner.metaScan(conf, this, visitor, tableName);
1058       // +1 needs to be added so that the empty start row is also taken into account
1059       return available.get() && (regionCount.get() == splitKeys.length + 1);
1060     }
1061 
1062     @Override
1063     public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1064         throws IOException {
1065       return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1066     }
1067 
1068     @Override
1069     public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1070       return locateRegion(HRegionInfo.getTable(regionName),
1071           HRegionInfo.getStartKey(regionName), false, true);
1072     }
1073 
1074     @Override
1075     public boolean isDeadServer(ServerName sn) {
1076       if (clusterStatusListener == null) {
1077         return false;
1078       } else {
1079         return clusterStatusListener.isDeadServer(sn);
1080       }
1081     }
1082 
1083     @Override
1084     public List<HRegionLocation> locateRegions(final TableName tableName)
1085     throws IOException {
1086       return locateRegions (tableName, false, true);
1087     }
1088 
1089     @Override
1090     public List<HRegionLocation> locateRegions(final byte[] tableName)
1091     throws IOException {
1092       return locateRegions(TableName.valueOf(tableName));
1093     }
1094 
1095     @Override
1096     public List<HRegionLocation> locateRegions(final TableName tableName,
1097         final boolean useCache, final boolean offlined) throws IOException {
1098       NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, this,
1099           tableName, offlined);
1100       final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1101       for (HRegionInfo regionInfo : regions.keySet()) {
1102         locations.add(locateRegion(tableName, regionInfo.getStartKey(), useCache, true));
1103       }
1104       return locations;
1105     }
1106 
1107     @Override
1108     public List<HRegionLocation> locateRegions(final byte[] tableName,
1109        final boolean useCache, final boolean offlined) throws IOException {
1110       return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1111     }
1112 
1113     @Override
1114     public HRegionLocation locateRegion(final TableName tableName,
1115         final byte [] row)
1116     throws IOException{
1117       return locateRegion(tableName, row, true, true);
1118     }
1119 
1120     @Override
1121     public HRegionLocation locateRegion(final byte[] tableName,
1122         final byte [] row)
1123     throws IOException{
1124       return locateRegion(TableName.valueOf(tableName), row);
1125     }
1126 
1127     @Override
1128     public HRegionLocation relocateRegion(final TableName tableName,
1129         final byte [] row) throws IOException{
1130       // Since this is an explicit request not to use any caching, finding
1131       // disabled tables should not be desirable.  This will ensure that an exception is thrown when
1132       // the first time a disabled table is interacted with.
1133       if (isTableDisabled(tableName)) {
1134         throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1135       }
1136 
1137       return locateRegion(tableName, row, false, true);
1138     }
1139 
1140     @Override
1141     public HRegionLocation relocateRegion(final byte[] tableName,
1142         final byte [] row) throws IOException {
1143       return relocateRegion(TableName.valueOf(tableName), row);
1144     }
1145 
1146 
1147     private HRegionLocation locateRegion(final TableName tableName,
1148       final byte [] row, boolean useCache, boolean retry)
1149     throws IOException {
1150       if (this.closed) throw new IOException(toString() + " closed");
1151       if (tableName== null || tableName.getName().length == 0) {
1152         throw new IllegalArgumentException(
1153             "table name cannot be null or zero length");
1154       }
1155 
1156       if (tableName.equals(TableName.META_TABLE_NAME)) {
1157         return this.registry.getMetaRegionLocation();
1158       } else {
1159         // Region not in the cache - have to go to the meta RS
1160         return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row,
1161           useCache, userRegionLock, retry);
1162       }
1163     }
1164 
1165     /*
1166      * Search hbase:meta for the HRegionLocation info that contains the table and
1167      * row we're seeking. It will prefetch certain number of regions info and
1168      * save them to the global region cache.
1169      */
1170     private void prefetchRegionCache(final TableName tableName,
1171         final byte[] row) {
1172       // Implement a new visitor for MetaScanner, and use it to walk through
1173       // the hbase:meta
1174       MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1175         public boolean processRow(Result result) throws IOException {
1176           try {
1177             HRegionInfo regionInfo = MetaScanner.getHRegionInfo(result);
1178             if (regionInfo == null) {
1179               return true;
1180             }
1181 
1182             // possible we got a region of a different table...
1183             if (!regionInfo.getTable().equals(tableName)) {
1184               return false; // stop scanning
1185             }
1186             if (regionInfo.isOffline()) {
1187               // don't cache offline regions
1188               return true;
1189             }
1190 
1191             ServerName serverName = HRegionInfo.getServerName(result);
1192             if (serverName == null) {
1193               return true; // don't cache it
1194             }
1195             // instantiate the location
1196             long seqNum = HRegionInfo.getSeqNumDuringOpen(result);
1197             HRegionLocation loc = new HRegionLocation(regionInfo, serverName, seqNum);
1198             // cache this meta entry
1199             cacheLocation(tableName, null, loc);
1200             return true;
1201           } catch (RuntimeException e) {
1202             throw new IOException(e);
1203           }
1204         }
1205       };
1206       try {
1207         // pre-fetch certain number of regions info at region cache.
1208         MetaScanner.metaScan(conf, this, visitor, tableName, row,
1209             this.prefetchRegionLimit, TableName.META_TABLE_NAME);
1210       } catch (IOException e) {
1211         if (ExceptionUtil.isInterrupt(e)) {
1212           Thread.currentThread().interrupt();
1213         }
1214       }
1215     }
1216 
1217     /*
1218       * Search the hbase:meta table for the HRegionLocation
1219       * info that contains the table and row we're seeking.
1220       */
1221     private HRegionLocation locateRegionInMeta(final TableName parentTable,
1222       final TableName tableName, final byte [] row, boolean useCache,
1223       Object regionLockObject, boolean retry)
1224     throws IOException {
1225       HRegionLocation location;
1226       // If we are supposed to be using the cache, look in the cache to see if
1227       // we already have the region.
1228       if (useCache) {
1229         location = getCachedLocation(tableName, row);
1230         if (location != null) {
1231           return location;
1232         }
1233       }
1234       int localNumRetries = retry ? numTries : 1;
1235       // build the key of the meta region we should be looking for.
1236       // the extra 9's on the end are necessary to allow "exact" matches
1237       // without knowing the precise region names.
1238       byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
1239         HConstants.NINES, false);
1240       for (int tries = 0; true; tries++) {
1241         if (tries >= localNumRetries) {
1242           throw new NoServerForRegionException("Unable to find region for "
1243             + Bytes.toStringBinary(row) + " after " + numTries + " tries.");
1244         }
1245 
1246         HRegionLocation metaLocation = null;
1247         try {
1248           // locate the meta region
1249           metaLocation = locateRegion(parentTable, metaKey, true, false);
1250           // If null still, go around again.
1251           if (metaLocation == null) continue;
1252           ClientService.BlockingInterface service = getClient(metaLocation.getServerName());
1253 
1254           Result regionInfoRow;
1255           // This block guards against two threads trying to load the meta
1256           // region at the same time. The first will load the meta region and
1257           // the second will use the value that the first one found.
1258           if (useCache) {
1259             if (TableName.META_TABLE_NAME.equals(parentTable) && usePrefetch &&
1260                 getRegionCachePrefetch(tableName)) {
1261               synchronized (regionLockObject) {
1262                 // Check the cache again for a hit in case some other thread made the
1263                 // same query while we were waiting on the lock.
1264                 location = getCachedLocation(tableName, row);
1265                 if (location != null) {
1266                   return location;
1267                 }
1268                 // If the parent table is META, we may want to pre-fetch some
1269                 // region info into the global region cache for this table.
1270                 prefetchRegionCache(tableName, row);
1271               }
1272             }
1273             location = getCachedLocation(tableName, row);
1274             if (location != null) {
1275               return location;
1276             }
1277           } else {
1278             // If we are not supposed to be using the cache, delete any existing cached location
1279             // so it won't interfere.
1280             forceDeleteCachedLocation(tableName, row);
1281           }
1282 
1283           // Query the meta region for the location of the meta region
1284           regionInfoRow =
1285               ProtobufUtil.getRowOrBefore(service, metaLocation.getRegionInfo().getRegionName(),
1286                 metaKey, HConstants.CATALOG_FAMILY);
1287 
1288           if (regionInfoRow == null) {
1289             throw new TableNotFoundException(tableName);
1290           }
1291 
1292           // convert the row result into the HRegionLocation we need!
1293           HRegionInfo regionInfo = MetaScanner.getHRegionInfo(regionInfoRow);
1294           if (regionInfo == null) {
1295             throw new IOException("HRegionInfo was null or empty in " +
1296               parentTable + ", row=" + regionInfoRow);
1297           }
1298 
1299           // possible we got a region of a different table...
1300           if (!regionInfo.getTable().equals(tableName)) {
1301             throw new TableNotFoundException(
1302                   "Table '" + tableName + "' was not found, got: " +
1303                   regionInfo.getTable() + ".");
1304           }
1305           if (regionInfo.isSplit()) {
1306             throw new RegionOfflineException("the only available region for" +
1307               " the required row is a split parent," +
1308               " the daughters should be online soon: " +
1309               regionInfo.getRegionNameAsString());
1310           }
1311           if (regionInfo.isOffline()) {
1312             throw new RegionOfflineException("the region is offline, could" +
1313               " be caused by a disable table call: " +
1314               regionInfo.getRegionNameAsString());
1315           }
1316 
1317           ServerName serverName = HRegionInfo.getServerName(regionInfoRow);
1318           if (serverName == null) {
1319             throw new NoServerForRegionException("No server address listed " +
1320               "in " + parentTable + " for region " +
1321               regionInfo.getRegionNameAsString() + " containing row " +
1322               Bytes.toStringBinary(row));
1323           }
1324 
1325           if (isDeadServer(serverName)){
1326             throw new RegionServerStoppedException("hbase:meta says the region "+
1327                 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1328                 ", but it is dead.");
1329           }
1330 
1331           // Instantiate the location
1332           location = new HRegionLocation(regionInfo, serverName,
1333             HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
1334           cacheLocation(tableName, null, location);
1335           return location;
1336         } catch (TableNotFoundException e) {
1337           // if we got this error, probably means the table just plain doesn't
1338           // exist. rethrow the error immediately. this should always be coming
1339           // from the HTable constructor.
1340           throw e;
1341         } catch (IOException e) {
1342           ExceptionUtil.rethrowIfInterrupt(e);
1343 
1344           if (e instanceof RemoteException) {
1345             e = ((RemoteException)e).unwrapRemoteException();
1346           }
1347           if (tries < numTries - 1) {
1348             if (LOG.isDebugEnabled()) {
1349               LOG.debug("locateRegionInMeta parentTable=" +
1350                 parentTable + ", metaLocation=" +
1351                 ((metaLocation == null)? "null": "{" + metaLocation + "}") +
1352                 ", attempt=" + tries + " of " +
1353                 this.numTries + " failed; retrying after sleep of " +
1354                 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
1355             }
1356           } else {
1357             throw e;
1358           }
1359           // Only relocate the parent region if necessary
1360           if(!(e instanceof RegionOfflineException ||
1361               e instanceof NoServerForRegionException)) {
1362             relocateRegion(parentTable, metaKey);
1363           }
1364         }
1365         try{
1366           Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1367         } catch (InterruptedException e) {
1368           throw new InterruptedIOException("Giving up trying to location region in " +
1369             "meta: thread is interrupted.");
1370         }
1371       }
1372     }
1373 
1374     /*
1375      * Search the cache for a location that fits our table and row key.
1376      * Return null if no suitable region is located.
1377      *
1378      * @param tableName
1379      * @param row
1380      * @return Null or region location found in cache.
1381      */
1382     HRegionLocation getCachedLocation(final TableName tableName,
1383         final byte [] row) {
1384       ConcurrentSkipListMap<byte[], HRegionLocation> tableLocations =
1385         getTableLocations(tableName);
1386 
1387       Entry<byte[], HRegionLocation> e = tableLocations.floorEntry(row);
1388       if (e == null) {
1389         return null;
1390       }
1391       HRegionLocation possibleRegion = e.getValue();
1392 
1393       // make sure that the end key is greater than the row we're looking
1394       // for, otherwise the row actually belongs in the next region, not
1395       // this one. the exception case is when the endkey is
1396       // HConstants.EMPTY_END_ROW, signifying that the region we're
1397       // checking is actually the last region in the table.
1398       byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
1399       if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
1400           tableName.getRowComparator().compareRows(
1401               endKey, 0, endKey.length, row, 0, row.length) > 0) {
1402         return possibleRegion;
1403       }
1404 
1405       // Passed all the way through, so we got nothing - complete cache miss
1406       return null;
1407     }
1408 
1409     /**
1410      * Delete a cached location, no matter what it is. Called when we were told to not use cache.
1411      * @param tableName tableName
1412      * @param row
1413      */
1414     void forceDeleteCachedLocation(final TableName tableName, final byte [] row) {
1415       HRegionLocation rl = null;
1416       Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
1417       // start to examine the cache. we can only do cache actions
1418       // if there's something in the cache for this table.
1419       rl = getCachedLocation(tableName, row);
1420       if (rl != null) {
1421         tableLocations.remove(rl.getRegionInfo().getStartKey());
1422       }
1423       if ((rl != null) && LOG.isDebugEnabled()) {
1424         LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort()
1425           + " as a location of " + rl.getRegionInfo().getRegionNameAsString() +
1426           " for tableName=" + tableName + " from cache");
1427       }
1428     }
1429 
1430     /*
1431      * Delete all cached entries of a table that maps to a specific location.
1432      */
1433     @Override
1434     public void clearCaches(final ServerName serverName) {
1435       if (!this.cachedServers.contains(serverName)) {
1436         return;
1437       }
1438 
1439       boolean deletedSomething = false;
1440       synchronized (this.cachedServers) {
1441         // We block here, because if there is an error on a server, it's likely that multiple
1442         //  threads will get the error  simultaneously. If there are hundreds of thousand of
1443         //  region location to check, it's better to do this only once. A better pattern would
1444         //  be to check if the server is dead when we get the region location.
1445         if (!this.cachedServers.contains(serverName)) {
1446           return;
1447         }
1448         for (Map<byte[], HRegionLocation> tableLocations : cachedRegionLocations.values()) {
1449           for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
1450             HRegionLocation value = e.getValue();
1451             if (value != null
1452                 && serverName.equals(value.getServerName())) {
1453               tableLocations.remove(e.getKey());
1454               deletedSomething = true;
1455             }
1456           }
1457         }
1458         this.cachedServers.remove(serverName);
1459       }
1460       if (deletedSomething && LOG.isDebugEnabled()) {
1461         LOG.debug("Removed all cached region locations that map to " + serverName);
1462       }
1463     }
1464 
1465     /*
1466      * @param tableName
1467      * @return Map of cached locations for passed <code>tableName</code>
1468      */
1469     private ConcurrentSkipListMap<byte[], HRegionLocation> getTableLocations(
1470         final TableName tableName) {
1471       // find the map of cached locations for this table
1472       ConcurrentSkipListMap<byte[], HRegionLocation> result;
1473       result = this.cachedRegionLocations.get(tableName);
1474       // if tableLocations for this table isn't built yet, make one
1475       if (result == null) {
1476         result = new ConcurrentSkipListMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR);
1477         ConcurrentSkipListMap<byte[], HRegionLocation> old =
1478             this.cachedRegionLocations.putIfAbsent(tableName, result);
1479         if (old != null) {
1480           return old;
1481         }
1482       }
1483       return result;
1484     }
1485 
1486     @Override
1487     public void clearRegionCache() {
1488       this.cachedRegionLocations.clear();
1489       this.cachedServers.clear();
1490     }
1491 
1492     @Override
1493     public void clearRegionCache(final TableName tableName) {
1494       this.cachedRegionLocations.remove(tableName);
1495     }
1496 
1497     @Override
1498     public void clearRegionCache(final byte[] tableName) {
1499       clearRegionCache(TableName.valueOf(tableName));
1500     }
1501 
1502     /**
1503      * Put a newly discovered HRegionLocation into the cache.
1504      * @param tableName The table name.
1505      * @param source the source of the new location, if it's not coming from meta
1506      * @param location the new location
1507      */
1508     private void cacheLocation(final TableName tableName, final HRegionLocation source,
1509         final HRegionLocation location) {
1510       boolean isFromMeta = (source == null);
1511       byte [] startKey = location.getRegionInfo().getStartKey();
1512       ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
1513       HRegionLocation oldLocation = tableLocations.putIfAbsent(startKey, location);
1514       boolean isNewCacheEntry = (oldLocation == null);
1515       if (isNewCacheEntry) {
1516         cachedServers.add(location.getServerName());
1517         return;
1518       }
1519       boolean updateCache;
1520       // If the server in cache sends us a redirect, assume it's always valid.
1521       if (oldLocation.equals(source)) {
1522         updateCache = true;
1523       } else {
1524         long newLocationSeqNum = location.getSeqNum();
1525         // Meta record is stale - some (probably the same) server has closed the region
1526         // with later seqNum and told us about the new location.
1527         boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum);
1528         // Same as above for redirect. However, in this case, if the number is equal to previous
1529         // record, the most common case is that first the region was closed with seqNum, and then
1530         // opened with the same seqNum; hence we will ignore the redirect.
1531         // There are so many corner cases with various combinations of opens and closes that
1532         // an additional counter on top of seqNum would be necessary to handle them all.
1533         boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
1534         boolean isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
1535         updateCache = (!isStaleUpdate);
1536       }
1537       if (updateCache) {
1538         tableLocations.replace(startKey, oldLocation, location);
1539         cachedServers.add(location.getServerName());
1540       }
1541     }
1542 
1543     // Map keyed by service name + regionserver to service stub implementation
1544     private final ConcurrentHashMap<String, Object> stubs =
1545       new ConcurrentHashMap<String, Object>();
1546     // Map of locks used creating service stubs per regionserver.
1547     private final ConcurrentHashMap<String, String> connectionLock =
1548       new ConcurrentHashMap<String, String>();
1549 
1550     /**
1551      * State of the MasterService connection/setup.
1552      */
1553     static class MasterServiceState {
1554       HConnection connection;
1555       MasterService.BlockingInterface stub;
1556       int userCount;
1557       long keepAliveUntil = Long.MAX_VALUE;
1558 
1559       MasterServiceState (final HConnection connection) {
1560         super();
1561         this.connection = connection;
1562       }
1563 
1564       @Override
1565       public String toString() {
1566         return "MasterService";
1567       }
1568 
1569       Object getStub() {
1570         return this.stub;
1571       }
1572 
1573       void clearStub() {
1574         this.stub = null;
1575       }
1576 
1577       boolean isMasterRunning() throws ServiceException {
1578         IsMasterRunningResponse response =
1579           this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1580         return response != null? response.getIsMasterRunning(): false;
1581       }
1582     }
1583 
1584     /**
1585      * Makes a client-side stub for master services. Sub-class to specialize.
1586      * Depends on hosting class so not static.  Exists so we avoid duplicating a bunch of code
1587      * when setting up the MasterMonitorService and MasterAdminService.
1588      */
1589     abstract class StubMaker {
1590       /**
1591        * Returns the name of the service stub being created.
1592        */
1593       protected abstract String getServiceName();
1594 
1595       /**
1596        * Make stub and cache it internal so can be used later doing the isMasterRunning call.
1597        * @param channel
1598        */
1599       protected abstract Object makeStub(final BlockingRpcChannel channel);
1600 
1601       /**
1602        * Once setup, check it works by doing isMasterRunning check.
1603        * @throws ServiceException
1604        */
1605       protected abstract void isMasterRunning() throws ServiceException;
1606 
1607       /**
1608        * Create a stub. Try once only.  It is not typed because there is no common type to
1609        * protobuf services nor their interfaces.  Let the caller do appropriate casting.
1610        * @return A stub for master services.
1611        * @throws IOException
1612        * @throws KeeperException
1613        * @throws ServiceException
1614        */
1615       private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1616         ZooKeeperKeepAliveConnection zkw;
1617         try {
1618           zkw = getKeepAliveZooKeeperWatcher();
1619         } catch (IOException e) {
1620           ExceptionUtil.rethrowIfInterrupt(e);
1621           throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1622         }
1623         try {
1624           checkIfBaseNodeAvailable(zkw);
1625           ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1626           if (sn == null) {
1627             String msg = "ZooKeeper available but no active master location found";
1628             LOG.info(msg);
1629             throw new MasterNotRunningException(msg);
1630           }
1631           if (isDeadServer(sn)) {
1632             throw new MasterNotRunningException(sn + " is dead.");
1633           }
1634           // Use the security info interface name as our stub key
1635           String key = getStubKey(getServiceName(), sn.getHostAndPort());
1636           connectionLock.putIfAbsent(key, key);
1637           Object stub = null;
1638           synchronized (connectionLock.get(key)) {
1639             stub = stubs.get(key);
1640             if (stub == null) {
1641               BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
1642                 user, rpcTimeout);
1643               stub = makeStub(channel);
1644               isMasterRunning();
1645               stubs.put(key, stub);
1646             }
1647           }
1648           return stub;
1649         } finally {
1650           zkw.close();
1651         }
1652       }
1653 
1654       /**
1655        * Create a stub against the master.  Retry if necessary.
1656        * @return A stub to do <code>intf</code> against the master
1657        * @throws MasterNotRunningException
1658        */
1659       @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD")
1660       Object makeStub() throws MasterNotRunningException {
1661         // The lock must be at the beginning to prevent multiple master creations
1662         //  (and leaks) in a multithread context
1663         synchronized (masterAndZKLock) {
1664           Exception exceptionCaught = null;
1665           Object stub = null;
1666           int tries = 0;
1667           while (!closed && stub == null) {
1668             tries++;
1669             try {
1670               stub = makeStubNoRetries();
1671             } catch (IOException e) {
1672               exceptionCaught = e;
1673             } catch (KeeperException e) {
1674               exceptionCaught = e;
1675             } catch (ServiceException e) {
1676               exceptionCaught = e;
1677             }
1678 
1679             if (exceptionCaught != null)
1680               // It failed. If it's not the last try, we're going to wait a little
1681               if (tries < numTries && !ExceptionUtil.isInterrupt(exceptionCaught)) {
1682                 // tries at this point is 1 or more; decrement to start from 0.
1683                 long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1);
1684                 LOG.info("getMaster attempt " + tries + " of " + numTries +
1685                     " failed; retrying after sleep of " + pauseTime + ", exception=" +
1686                   exceptionCaught);
1687 
1688                 try {
1689                   Thread.sleep(pauseTime);
1690                 } catch (InterruptedException e) {
1691                   throw new MasterNotRunningException(
1692                       "Thread was interrupted while trying to connect to master.", e);
1693                 }
1694               } else {
1695                 // Enough tries, we stop now
1696                 LOG.info("getMaster attempt " + tries + " of " + numTries +
1697                     " failed; no more retrying.", exceptionCaught);
1698                 throw new MasterNotRunningException(exceptionCaught);
1699               }
1700           }
1701 
1702           if (stub == null) {
1703             // implies this.closed true
1704             throw new MasterNotRunningException("Connection was closed while trying to get master");
1705           }
1706           return stub;
1707         }
1708       }
1709     }
1710 
1711     /**
1712      * Class to make a MasterServiceStubMaker stub.
1713      */
1714     class MasterServiceStubMaker extends StubMaker {
1715       private MasterService.BlockingInterface stub;
1716       @Override
1717       protected String getServiceName() {
1718         return MasterService.getDescriptor().getName();
1719       }
1720 
1721       @Override
1722       @edu.umd.cs.findbugs.annotations.SuppressWarnings("SWL_SLEEP_WITH_LOCK_HELD")
1723       MasterService.BlockingInterface makeStub() throws MasterNotRunningException {
1724         return (MasterService.BlockingInterface)super.makeStub();
1725       }
1726 
1727       @Override
1728       protected Object makeStub(BlockingRpcChannel channel) {
1729         this.stub = MasterService.newBlockingStub(channel);
1730         return this.stub;
1731       }
1732 
1733       @Override
1734       protected void isMasterRunning() throws ServiceException {
1735         this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1736       }
1737     }
1738 
1739     @Override
1740     public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1741         throws IOException {
1742       return getAdmin(serverName, false);
1743     }
1744 
1745     @Override
1746     // Nothing is done w/ the 'master' parameter.  It is ignored.
1747     public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1748       final boolean master)
1749     throws IOException {
1750       if (isDeadServer(serverName)) {
1751         throw new RegionServerStoppedException(serverName + " is dead.");
1752       }
1753       String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1754         serverName.getHostAndPort());
1755       this.connectionLock.putIfAbsent(key, key);
1756       AdminService.BlockingInterface stub = null;
1757       synchronized (this.connectionLock.get(key)) {
1758         stub = (AdminService.BlockingInterface)this.stubs.get(key);
1759         if (stub == null) {
1760           BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName,
1761             user, this.rpcTimeout);
1762           stub = AdminService.newBlockingStub(channel);
1763           this.stubs.put(key, stub);
1764         }
1765       }
1766       return stub;
1767     }
1768 
1769     @Override
1770     public ClientService.BlockingInterface getClient(final ServerName sn)
1771     throws IOException {
1772       if (isDeadServer(sn)) {
1773         throw new RegionServerStoppedException(sn + " is dead.");
1774       }
1775       String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort());
1776       this.connectionLock.putIfAbsent(key, key);
1777       ClientService.BlockingInterface stub = null;
1778       synchronized (this.connectionLock.get(key)) {
1779         stub = (ClientService.BlockingInterface)this.stubs.get(key);
1780         if (stub == null) {
1781           BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
1782             user, this.rpcTimeout);
1783           stub = ClientService.newBlockingStub(channel);
1784           // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
1785           // Just fail on first actual call rather than in here on setup.
1786           this.stubs.put(key, stub);
1787         }
1788       }
1789       return stub;
1790     }
1791 
1792     static String getStubKey(final String serviceName, final String rsHostnamePort) {
1793       return serviceName + "@" + rsHostnamePort;
1794     }
1795 
1796     private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1797     private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1798     private boolean canCloseZKW = true;
1799 
1800     // keepAlive time, in ms. No reason to make it configurable.
1801     private static final long keepAlive = 5 * 60 * 1000;
1802 
1803     /**
1804      * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
1805      * @return The shared instance. Never returns null.
1806      */
1807     ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1808       throws IOException {
1809       synchronized (masterAndZKLock) {
1810         if (keepAliveZookeeper == null) {
1811           if (this.closed) {
1812             throw new IOException(toString() + " closed");
1813           }
1814           // We don't check that our link to ZooKeeper is still valid
1815           // But there is a retry mechanism in the ZooKeeperWatcher itself
1816           keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1817         }
1818         keepAliveZookeeperUserCount.incrementAndGet();
1819         keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1820         return keepAliveZookeeper;
1821       }
1822     }
1823 
1824     void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1825       if (zkw == null){
1826         return;
1827       }
1828       synchronized (masterAndZKLock) {
1829         if (keepAliveZookeeperUserCount.decrementAndGet() <= 0 ){
1830           keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1831         }
1832       }
1833     }
1834 
1835     /**
1836      * Creates a Chore thread to check the connections to master & zookeeper
1837      *  and close them when they reach their closing time (
1838      *  {@link MasterServiceState#keepAliveUntil} and
1839      *  {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is
1840      *  managed by the release functions and the variable {@link #keepAlive}
1841      */
1842     private static class DelayedClosing extends Chore implements Stoppable {
1843       private HConnectionImplementation hci;
1844       Stoppable stoppable;
1845 
1846       private DelayedClosing(
1847         HConnectionImplementation hci, Stoppable stoppable){
1848         super(
1849           "ZooKeeperWatcher and Master delayed closing for connection "+hci,
1850           60*1000, // We check every minutes
1851           stoppable);
1852         this.hci = hci;
1853         this.stoppable = stoppable;
1854       }
1855 
1856       static DelayedClosing createAndStart(HConnectionImplementation hci){
1857         Stoppable stoppable = new Stoppable() {
1858               private volatile boolean isStopped = false;
1859               @Override public void stop(String why) { isStopped = true;}
1860               @Override public boolean isStopped() {return isStopped;}
1861             };
1862 
1863         return new DelayedClosing(hci, stoppable);
1864       }
1865 
1866       protected void closeMasterProtocol(MasterServiceState protocolState) {
1867         if (System.currentTimeMillis() > protocolState.keepAliveUntil) {
1868           hci.closeMasterService(protocolState);
1869           protocolState.keepAliveUntil = Long.MAX_VALUE;
1870         }
1871       }
1872 
1873       @Override
1874       protected void chore() {
1875         synchronized (hci.masterAndZKLock) {
1876           if (hci.canCloseZKW) {
1877             if (System.currentTimeMillis() >
1878               hci.keepZooKeeperWatcherAliveUntil) {
1879 
1880               hci.closeZooKeeperWatcher();
1881               hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1882             }
1883           }
1884           closeMasterProtocol(hci.masterServiceState);
1885           closeMasterProtocol(hci.masterServiceState);
1886         }
1887       }
1888 
1889       @Override
1890       public void stop(String why) {
1891         stoppable.stop(why);
1892       }
1893 
1894       @Override
1895       public boolean isStopped() {
1896         return stoppable.isStopped();
1897       }
1898     }
1899 
1900     private void closeZooKeeperWatcher() {
1901       synchronized (masterAndZKLock) {
1902         if (keepAliveZookeeper != null) {
1903           LOG.info("Closing zookeeper sessionid=0x" +
1904             Long.toHexString(
1905               keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1906           keepAliveZookeeper.internalClose();
1907           keepAliveZookeeper = null;
1908         }
1909         keepAliveZookeeperUserCount.set(0);
1910       }
1911     }
1912 
1913     final MasterServiceState masterServiceState = new MasterServiceState(this);
1914 
1915     @Override
1916     public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1917       return getKeepAliveMasterService();
1918     }
1919 
1920     private void resetMasterServiceState(final MasterServiceState mss) {
1921       mss.userCount++;
1922       mss.keepAliveUntil = Long.MAX_VALUE;
1923     }
1924 
1925     @Override
1926     public MasterKeepAliveConnection getKeepAliveMasterService()
1927     throws MasterNotRunningException {
1928       synchronized (masterAndZKLock) {
1929         if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1930           MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1931           this.masterServiceState.stub = stubMaker.makeStub();
1932         }
1933         resetMasterServiceState(this.masterServiceState);
1934       }
1935       // Ugly delegation just so we can add in a Close method.
1936       final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1937       return new MasterKeepAliveConnection() {
1938         MasterServiceState mss = masterServiceState;
1939         @Override
1940         public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1941         throws ServiceException {
1942           return stub.addColumn(controller, request);
1943         }
1944 
1945         @Override
1946         public DeleteColumnResponse deleteColumn(RpcController controller,
1947             DeleteColumnRequest request)
1948         throws ServiceException {
1949           return stub.deleteColumn(controller, request);
1950         }
1951 
1952         @Override
1953         public ModifyColumnResponse modifyColumn(RpcController controller,
1954             ModifyColumnRequest request)
1955         throws ServiceException {
1956           return stub.modifyColumn(controller, request);
1957         }
1958 
1959         @Override
1960         public MoveRegionResponse moveRegion(RpcController controller,
1961             MoveRegionRequest request) throws ServiceException {
1962           return stub.moveRegion(controller, request);
1963         }
1964 
1965         @Override
1966         public DispatchMergingRegionsResponse dispatchMergingRegions(
1967             RpcController controller, DispatchMergingRegionsRequest request)
1968             throws ServiceException {
1969           return stub.dispatchMergingRegions(controller, request);
1970         }
1971 
1972         @Override
1973         public AssignRegionResponse assignRegion(RpcController controller,
1974             AssignRegionRequest request) throws ServiceException {
1975           return stub.assignRegion(controller, request);
1976         }
1977 
1978         @Override
1979         public UnassignRegionResponse unassignRegion(RpcController controller,
1980             UnassignRegionRequest request) throws ServiceException {
1981           return stub.unassignRegion(controller, request);
1982         }
1983 
1984         @Override
1985         public OfflineRegionResponse offlineRegion(RpcController controller,
1986             OfflineRegionRequest request) throws ServiceException {
1987           return stub.offlineRegion(controller, request);
1988         }
1989 
1990         @Override
1991         public DeleteTableResponse deleteTable(RpcController controller,
1992             DeleteTableRequest request) throws ServiceException {
1993           return stub.deleteTable(controller, request);
1994         }
1995 
1996         @Override
1997         public EnableTableResponse enableTable(RpcController controller,
1998             EnableTableRequest request) throws ServiceException {
1999           return stub.enableTable(controller, request);
2000         }
2001 
2002         @Override
2003         public DisableTableResponse disableTable(RpcController controller,
2004             DisableTableRequest request) throws ServiceException {
2005           return stub.disableTable(controller, request);
2006         }
2007 
2008         @Override
2009         public ModifyTableResponse modifyTable(RpcController controller,
2010             ModifyTableRequest request) throws ServiceException {
2011           return stub.modifyTable(controller, request);
2012         }
2013 
2014         @Override
2015         public CreateTableResponse createTable(RpcController controller,
2016             CreateTableRequest request) throws ServiceException {
2017           return stub.createTable(controller, request);
2018         }
2019 
2020         @Override
2021         public ShutdownResponse shutdown(RpcController controller,
2022             ShutdownRequest request) throws ServiceException {
2023           return stub.shutdown(controller, request);
2024         }
2025 
2026         @Override
2027         public StopMasterResponse stopMaster(RpcController controller,
2028             StopMasterRequest request) throws ServiceException {
2029           return stub.stopMaster(controller, request);
2030         }
2031 
2032         @Override
2033         public BalanceResponse balance(RpcController controller,
2034             BalanceRequest request) throws ServiceException {
2035           return stub.balance(controller, request);
2036         }
2037 
2038         @Override
2039         public SetBalancerRunningResponse setBalancerRunning(
2040             RpcController controller, SetBalancerRunningRequest request)
2041             throws ServiceException {
2042           return stub.setBalancerRunning(controller, request);
2043         }
2044 
2045         @Override
2046         public RunCatalogScanResponse runCatalogScan(RpcController controller,
2047             RunCatalogScanRequest request) throws ServiceException {
2048           return stub.runCatalogScan(controller, request);
2049         }
2050 
2051         @Override
2052         public EnableCatalogJanitorResponse enableCatalogJanitor(
2053             RpcController controller, EnableCatalogJanitorRequest request)
2054             throws ServiceException {
2055           return stub.enableCatalogJanitor(controller, request);
2056         }
2057 
2058         @Override
2059         public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
2060             RpcController controller, IsCatalogJanitorEnabledRequest request)
2061             throws ServiceException {
2062           return stub.isCatalogJanitorEnabled(controller, request);
2063         }
2064 
2065         @Override
2066         public CoprocessorServiceResponse execMasterService(
2067             RpcController controller, CoprocessorServiceRequest request)
2068             throws ServiceException {
2069           return stub.execMasterService(controller, request);
2070         }
2071 
2072         @Override
2073         public SnapshotResponse snapshot(RpcController controller,
2074             SnapshotRequest request) throws ServiceException {
2075           return stub.snapshot(controller, request);
2076         }
2077 
2078         @Override
2079         public GetCompletedSnapshotsResponse getCompletedSnapshots(
2080             RpcController controller, GetCompletedSnapshotsRequest request)
2081             throws ServiceException {
2082           return stub.getCompletedSnapshots(controller, request);
2083         }
2084 
2085         @Override
2086         public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
2087             DeleteSnapshotRequest request) throws ServiceException {
2088           return stub.deleteSnapshot(controller, request);
2089         }
2090 
2091         @Override
2092         public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
2093             IsSnapshotDoneRequest request) throws ServiceException {
2094           return stub.isSnapshotDone(controller, request);
2095         }
2096 
2097         @Override
2098         public RestoreSnapshotResponse restoreSnapshot(
2099             RpcController controller, RestoreSnapshotRequest request)
2100             throws ServiceException {
2101           return stub.restoreSnapshot(controller, request);
2102         }
2103 
2104         @Override
2105         public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
2106             RpcController controller, IsRestoreSnapshotDoneRequest request)
2107             throws ServiceException {
2108           return stub.isRestoreSnapshotDone(controller, request);
2109         }
2110 
2111         @Override
2112         public ExecProcedureResponse execProcedure(
2113             RpcController controller, ExecProcedureRequest request)
2114             throws ServiceException {
2115           return stub.execProcedure(controller, request);
2116         }
2117 
2118         @Override
2119         public IsProcedureDoneResponse isProcedureDone(RpcController controller,
2120             IsProcedureDoneRequest request) throws ServiceException {
2121           return stub.isProcedureDone(controller, request);
2122         }
2123 
2124         @Override
2125         public IsMasterRunningResponse isMasterRunning(
2126             RpcController controller, IsMasterRunningRequest request)
2127             throws ServiceException {
2128           return stub.isMasterRunning(controller, request);
2129         }
2130 
2131         @Override
2132         public ModifyNamespaceResponse modifyNamespace(RpcController controller,
2133             ModifyNamespaceRequest request)
2134         throws ServiceException {
2135           return stub.modifyNamespace(controller, request);
2136         }
2137 
2138         @Override
2139         public CreateNamespaceResponse createNamespace(RpcController controller, CreateNamespaceRequest request) throws ServiceException {
2140           return stub.createNamespace(controller, request);
2141         }
2142 
2143         @Override
2144         public DeleteNamespaceResponse deleteNamespace(RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
2145           return stub.deleteNamespace(controller, request);
2146         }
2147 
2148         @Override
2149         public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller, GetNamespaceDescriptorRequest request) throws ServiceException {
2150           return stub.getNamespaceDescriptor(controller, request);
2151         }
2152 
2153         @Override
2154         public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller, ListNamespaceDescriptorsRequest request) throws ServiceException {
2155           return stub.listNamespaceDescriptors(controller, request);
2156         }
2157 
2158         @Override
2159         public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(RpcController controller, ListTableDescriptorsByNamespaceRequest request) throws ServiceException {
2160           return stub.listTableDescriptorsByNamespace(controller, request);
2161         }
2162 
2163         @Override
2164         public ListTableNamesByNamespaceResponse listTableNamesByNamespace(RpcController controller,
2165               ListTableNamesByNamespaceRequest request) throws ServiceException {
2166           return stub.listTableNamesByNamespace(controller, request);
2167         }
2168 
2169         @Override
2170         public void close() {
2171           release(this.mss);
2172         }
2173 
2174         @Override
2175         public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2176             RpcController controller, GetSchemaAlterStatusRequest request)
2177             throws ServiceException {
2178           return stub.getSchemaAlterStatus(controller, request);
2179         }
2180 
2181         @Override
2182         public GetTableDescriptorsResponse getTableDescriptors(
2183             RpcController controller, GetTableDescriptorsRequest request)
2184             throws ServiceException {
2185           return stub.getTableDescriptors(controller, request);
2186         }
2187 
2188         @Override
2189         public GetTableNamesResponse getTableNames(
2190             RpcController controller, GetTableNamesRequest request)
2191             throws ServiceException {
2192           return stub.getTableNames(controller, request);
2193         }
2194 
2195         @Override
2196         public GetClusterStatusResponse getClusterStatus(
2197             RpcController controller, GetClusterStatusRequest request)
2198             throws ServiceException {
2199           return stub.getClusterStatus(controller, request);
2200         }
2201 
2202         @Override
2203         public TruncateTableResponse truncateTable(RpcController controller,
2204             TruncateTableRequest request) throws ServiceException {
2205           return stub.truncateTable(controller, request);
2206         }
2207       };
2208     }
2209 
2210 
2211     private static void release(MasterServiceState mss) {
2212       if (mss != null && mss.connection != null) {
2213         ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2214       }
2215     }
2216 
2217     private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2218       if (mss.getStub() == null){
2219         return false;
2220       }
2221       try {
2222         return mss.isMasterRunning();
2223       } catch (UndeclaredThrowableException e) {
2224         // It's somehow messy, but we can receive exceptions such as
2225         //  java.net.ConnectException but they're not declared. So we catch it...
2226         LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2227         return false;
2228       } catch (ServiceException se) {
2229         LOG.warn("Checking master connection", se);
2230         return false;
2231       }
2232     }
2233 
2234     void releaseMaster(MasterServiceState mss) {
2235       if (mss.getStub() == null) return;
2236       synchronized (masterAndZKLock) {
2237         --mss.userCount;
2238         if (mss.userCount <= 0) {
2239           mss.keepAliveUntil = System.currentTimeMillis() + keepAlive;
2240         }
2241       }
2242     }
2243 
2244     private void closeMasterService(MasterServiceState mss) {
2245       if (mss.getStub() != null) {
2246         LOG.info("Closing master protocol: " + mss);
2247         mss.clearStub();
2248       }
2249       mss.userCount = 0;
2250     }
2251 
2252     /**
2253      * Immediate close of the shared master. Can be by the delayed close or when closing the
2254      * connection itself.
2255      */
2256     private void closeMaster() {
2257       synchronized (masterAndZKLock) {
2258         closeMasterService(masterServiceState);
2259       }
2260     }
2261 
2262     void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
2263                               ServerName serverName, long seqNum) {
2264       HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2265       cacheLocation(hri.getTable(), source, newHrl);
2266     }
2267 
2268    /**
2269     * Deletes the cached location of the region if necessary, based on some error from source.
2270     * @param hri The region in question.
2271     * @param source The source of the error that prompts us to invalidate cache.
2272     */
2273    void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
2274      ConcurrentMap<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
2275      tableLocations.remove(hri.getStartKey(), source);
2276    }
2277 
2278     @Override
2279     public void deleteCachedRegionLocation(final HRegionLocation location) {
2280       if (location == null) {
2281         return;
2282       }
2283 
2284       HRegionLocation removedLocation;
2285       TableName tableName = location.getRegionInfo().getTable();
2286       Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
2287       removedLocation = tableLocations.remove(location.getRegionInfo().getStartKey());
2288       if (LOG.isDebugEnabled() && removedLocation != null) {
2289         LOG.debug("Removed " +
2290             location.getRegionInfo().getRegionNameAsString() +
2291             " for tableName=" + tableName +
2292             " from cache");
2293       }
2294     }
2295 
2296     /**
2297      * Update the location with the new value (if the exception is a RegionMovedException)
2298      * or delete it from the cache. Does nothing if we can be sure from the exception that
2299      * the location is still accurate, or if the cache has already been updated.
2300      * @param exception an object (to simplify user code) on which we will try to find a nested
2301      *                  or wrapped or both RegionMovedException
2302      * @param source server that is the source of the location update.
2303      */
2304     @Override
2305     public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2306       final Object exception, final HRegionLocation source) {
2307       if (rowkey == null || tableName == null) {
2308         LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2309             ", tableName=" + (tableName == null ? "null" : tableName));
2310         return;
2311       }
2312 
2313       if (source == null || source.getServerName() == null){
2314         // This should not happen, but let's secure ourselves.
2315         return;
2316       }
2317 
2318       // Is it something we have already updated?
2319       final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey);
2320       if (oldLocation == null || !source.getServerName().equals(oldLocation.getServerName())) {
2321         // There is no such location in the cache (it's been removed already) or
2322         // the cache has already been refreshed with a different location.  => nothing to do
2323         return;
2324       }
2325 
2326       HRegionInfo regionInfo = oldLocation.getRegionInfo();
2327       Throwable cause = findException(exception);
2328       if (cause != null) {
2329         if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException) {
2330           // We know that the region is still on this region server
2331           return;
2332         }
2333 
2334         if (cause instanceof RegionMovedException) {
2335           RegionMovedException rme = (RegionMovedException) cause;
2336           if (LOG.isTraceEnabled()) {
2337             LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2338                 rme.getHostname() + ":" + rme.getPort() +
2339                 " according to " + source.getHostnamePort());
2340           }
2341           // We know that the region is not anymore on this region server, but we know
2342           //  the new location.
2343           updateCachedLocation(
2344               regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2345           return;
2346         }
2347       }
2348 
2349       // If we're here, it means that can cannot be sure about the location, so we remove it from
2350       //  the cache.
2351       deleteCachedLocation(regionInfo, source);
2352     }
2353 
2354     @Override
2355     public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2356       final Object exception, final HRegionLocation source) {
2357       updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2358     }
2359 
2360     @Override
2361     @Deprecated
2362     public void processBatch(List<? extends Row> list,
2363         final TableName tableName,
2364         ExecutorService pool,
2365         Object[] results) throws IOException, InterruptedException {
2366       // This belongs in HTable!!! Not in here.  St.Ack
2367 
2368       // results must be the same size as list
2369       if (results.length != list.size()) {
2370         throw new IllegalArgumentException(
2371           "argument results must be the same size as argument list");
2372       }
2373       processBatchCallback(list, tableName, pool, results, null);
2374     }
2375 
2376     @Override
2377     @Deprecated
2378     public void processBatch(List<? extends Row> list,
2379         final byte[] tableName,
2380         ExecutorService pool,
2381         Object[] results) throws IOException, InterruptedException {
2382       processBatch(list, TableName.valueOf(tableName), pool, results);
2383     }
2384 
2385     /**
2386      * Send the queries in parallel on the different region servers. Retries on failures.
2387      * If the method returns it means that there is no error, and the 'results' array will
2388      * contain no exception. On error, an exception is thrown, and the 'results' array will
2389      * contain results and exceptions.
2390      * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
2391      */
2392     @Override
2393     @Deprecated
2394     public <R> void processBatchCallback(
2395       List<? extends Row> list,
2396       TableName tableName,
2397       ExecutorService pool,
2398       Object[] results,
2399       Batch.Callback<R> callback)
2400       throws IOException, InterruptedException {
2401 
2402       // To fulfill the original contract, we have a special callback. This callback
2403       //  will set the results in the Object array.
2404       ObjectResultFiller<R> cb = new ObjectResultFiller<R>(results, callback);
2405       AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
2406 
2407       // We're doing a submit all. This way, the originalIndex will match the initial list.
2408       asyncProcess.submitAll(list);
2409       asyncProcess.waitUntilDone();
2410 
2411       if (asyncProcess.hasError()) {
2412         throw asyncProcess.getErrors();
2413       }
2414     }
2415 
2416     @Override
2417     @Deprecated
2418     public <R> void processBatchCallback(
2419       List<? extends Row> list,
2420       byte[] tableName,
2421       ExecutorService pool,
2422       Object[] results,
2423       Batch.Callback<R> callback)
2424       throws IOException, InterruptedException {
2425       processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2426     }
2427 
2428     // For tests.
2429     protected <R> AsyncProcess createAsyncProcess(TableName tableName, ExecutorService pool,
2430            AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
2431       RpcControllerFactory controllerFactory = RpcControllerFactory.instantiate(conf);
2432       RpcRetryingCallerFactory callerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats);
2433       return new AsyncProcess<R>(this, tableName, pool, callback, conf, callerFactory,
2434         controllerFactory);
2435     }
2436 
2437     /**
2438      * Fill the result array for the interfaces using it.
2439      */
2440     private static class ObjectResultFiller<Res>
2441         implements AsyncProcess.AsyncProcessCallback<Res> {
2442 
2443       private final Object[] results;
2444       private Batch.Callback<Res> callback;
2445 
2446       ObjectResultFiller(Object[] results, Batch.Callback<Res> callback) {
2447         this.results = results;
2448         this.callback = callback;
2449       }
2450 
2451       @Override
2452       public void success(int pos, byte[] region, Row row, Res result) {
2453         assert pos < results.length;
2454         results[pos] = result;
2455         if (callback != null) {
2456           callback.update(region, row.getRow(), result);
2457         }
2458       }
2459 
2460       @Override
2461       public boolean failure(int pos, byte[] region, Row row, Throwable t) {
2462         assert pos < results.length;
2463         results[pos] = t;
2464         //Batch.Callback<Res> was not called on failure in 0.94. We keep this.
2465         return true; // we want to have this failure in the failures list.
2466       }
2467 
2468       @Override
2469       public boolean retriableFailure(int originalIndex, Row row, byte[] region,
2470                                       Throwable exception) {
2471         return true; // we retry
2472       }
2473     }
2474 
2475     @Override
2476     public ServerStatisticTracker getStatisticsTracker() {
2477       return this.stats;
2478     }
2479 
2480     @Override
2481     public ClientBackoffPolicy getBackoffPolicy() {
2482       return this.backoffPolicy;
2483     }
2484 
2485     /*
2486      * Return the number of cached region for a table. It will only be called
2487      * from a unit test.
2488      */
2489     int getNumberOfCachedRegionLocations(final TableName tableName) {
2490       Map<byte[], HRegionLocation> tableLocs = this.cachedRegionLocations.get(tableName);
2491       if (tableLocs == null) {
2492         return 0;
2493       }
2494       return tableLocs.values().size();
2495     }
2496 
2497     /**
2498      * Check the region cache to see whether a region is cached yet or not.
2499      * Called by unit tests.
2500      * @param tableName tableName
2501      * @param row row
2502      * @return Region cached or not.
2503      */
2504     boolean isRegionCached(TableName tableName, final byte[] row) {
2505       HRegionLocation location = getCachedLocation(tableName, row);
2506       return location != null;
2507     }
2508 
2509     @Override
2510     public void setRegionCachePrefetch(final TableName tableName,
2511         final boolean enable) {
2512       if (!enable) {
2513         regionCachePrefetchDisabledTables.add(Bytes.mapKey(tableName.getName()));
2514       }
2515       else {
2516         regionCachePrefetchDisabledTables.remove(Bytes.mapKey(tableName.getName()));
2517       }
2518     }
2519 
2520     @Override
2521     public void setRegionCachePrefetch(final byte[] tableName,
2522         final boolean enable) {
2523       setRegionCachePrefetch(TableName.valueOf(tableName), enable);
2524     }
2525 
2526     @Override
2527     public boolean getRegionCachePrefetch(TableName tableName) {
2528       return usePrefetch &&
2529           !regionCachePrefetchDisabledTables.contains(Bytes.mapKey(tableName.getName()));
2530     }
2531 
2532     @Override
2533     public boolean getRegionCachePrefetch(byte[] tableName) {
2534       return getRegionCachePrefetch(TableName.valueOf(tableName));
2535     }
2536 
2537     @Override
2538     public void abort(final String msg, Throwable t) {
2539       if (t instanceof KeeperException.SessionExpiredException
2540         && keepAliveZookeeper != null) {
2541         synchronized (masterAndZKLock) {
2542           if (keepAliveZookeeper != null) {
2543             LOG.warn("This client just lost it's session with ZooKeeper," +
2544               " closing it." +
2545               " It will be recreated next time someone needs it", t);
2546             closeZooKeeperWatcher();
2547           }
2548         }
2549       } else {
2550         if (t != null) {
2551           LOG.fatal(msg, t);
2552         } else {
2553           LOG.fatal(msg);
2554         }
2555         this.aborted = true;
2556         close();
2557         this.closed = true;
2558       }
2559     }
2560 
2561     @Override
2562     public boolean isClosed() {
2563       return this.closed;
2564     }
2565 
2566     @Override
2567     public boolean isAborted(){
2568       return this.aborted;
2569     }
2570 
2571     @Override
2572     public int getCurrentNrHRS() throws IOException {
2573       return this.registry.getCurrentNrHRS();
2574     }
2575 
2576     /**
2577      * Increment this client's reference count.
2578      */
2579     void incCount() {
2580       ++refCount;
2581     }
2582 
2583     /**
2584      * Decrement this client's reference count.
2585      */
2586     void decCount() {
2587       if (refCount > 0) {
2588         --refCount;
2589       }
2590     }
2591 
2592     /**
2593      * Return if this client has no reference
2594      *
2595      * @return true if this client has no reference; false otherwise
2596      */
2597     boolean isZeroReference() {
2598       return refCount == 0;
2599     }
2600 
2601     void internalClose() {
2602       if (this.closed) {
2603         return;
2604       }
2605       delayedClosing.stop("Closing connection");
2606       closeMaster();
2607       shutdownBatchPool();
2608       this.closed = true;
2609       closeZooKeeperWatcher();
2610       this.stubs.clear();
2611       if (clusterStatusListener != null) {
2612         clusterStatusListener.close();
2613       }
2614       if (rpcClient != null) {
2615         rpcClient.stop();
2616       }
2617     }
2618 
2619     @Override
2620     public void close() {
2621       if (managed) {
2622         if (aborted) {
2623           HConnectionManager.deleteStaleConnection(this);
2624         } else {
2625           HConnectionManager.deleteConnection(this, false);
2626         }
2627       } else {
2628         internalClose();
2629       }
2630     }
2631 
2632     /**
2633      * Close the connection for good, regardless of what the current value of
2634      * {@link #refCount} is. Ideally, {@link #refCount} should be zero at this
2635      * point, which would be the case if all of its consumers close the
2636      * connection. However, on the off chance that someone is unable to close
2637      * the connection, perhaps because it bailed out prematurely, the method
2638      * below will ensure that this {@link HConnection} instance is cleaned up.
2639      * Caveat: The JVM may take an unknown amount of time to call finalize on an
2640      * unreachable object, so our hope is that every consumer cleans up after
2641      * itself, like any good citizen.
2642      */
2643     @Override
2644     protected void finalize() throws Throwable {
2645       super.finalize();
2646       // Pretend as if we are about to release the last remaining reference
2647       refCount = 1;
2648       close();
2649     }
2650 
2651     @Override
2652     public HTableDescriptor[] listTables() throws IOException {
2653       MasterKeepAliveConnection master = getKeepAliveMasterService();
2654       try {
2655         GetTableDescriptorsRequest req =
2656           RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2657         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2658       } catch (ServiceException se) {
2659         throw ProtobufUtil.getRemoteException(se);
2660       } finally {
2661         master.close();
2662       }
2663     }
2664 
2665     @Override
2666     public String[] getTableNames() throws IOException {
2667       TableName[] tableNames = listTableNames();
2668       String result[] = new String[tableNames.length];
2669       for (int i = 0; i < tableNames.length; i++) {
2670         result[i] = tableNames[i].getNameAsString();
2671       }
2672       return result;
2673     }
2674 
2675     @Override
2676     public TableName[] listTableNames() throws IOException {
2677       MasterKeepAliveConnection master = getKeepAliveMasterService();
2678       try {
2679         return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2680             GetTableNamesRequest.newBuilder().build())
2681           .getTableNamesList());
2682       } catch (ServiceException se) {
2683         throw ProtobufUtil.getRemoteException(se);
2684       } finally {
2685         master.close();
2686       }
2687     }
2688 
2689     @Override
2690     public HTableDescriptor[] getHTableDescriptorsByTableName(
2691         List<TableName> tableNames) throws IOException {
2692       if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2693       MasterKeepAliveConnection master = getKeepAliveMasterService();
2694       try {
2695         GetTableDescriptorsRequest req =
2696           RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2697         return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2698       } catch (ServiceException se) {
2699         throw ProtobufUtil.getRemoteException(se);
2700       } finally {
2701         master.close();
2702       }
2703     }
2704 
2705     @Override
2706     public HTableDescriptor[] getHTableDescriptors(
2707         List<String> names) throws IOException {
2708       List<TableName> tableNames = new ArrayList(names.size());
2709       for(String name : names) {
2710         tableNames.add(TableName.valueOf(name));
2711       }
2712 
2713       return getHTableDescriptorsByTableName(tableNames);
2714     }
2715 
2716     @Override
2717     public NonceGenerator getNonceGenerator() {
2718       return this.nonceGenerator;
2719     }
2720 
2721     /**
2722      * Connects to the master to get the table descriptor.
2723      * @param tableName table name
2724      * @return
2725      * @throws IOException if the connection to master fails or if the table
2726      *  is not found.
2727      */
2728     @Override
2729     public HTableDescriptor getHTableDescriptor(final TableName tableName)
2730     throws IOException {
2731       if (tableName == null) return null;
2732       MasterKeepAliveConnection master = getKeepAliveMasterService();
2733       GetTableDescriptorsResponse htds;
2734       try {
2735         GetTableDescriptorsRequest req =
2736           RequestConverter.buildGetTableDescriptorsRequest(tableName);
2737         htds = master.getTableDescriptors(null, req);
2738       } catch (ServiceException se) {
2739         throw ProtobufUtil.getRemoteException(se);
2740       } finally {
2741         master.close();
2742       }
2743       if (!htds.getTableSchemaList().isEmpty()) {
2744         return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2745       }
2746       throw new TableNotFoundException(tableName.getNameAsString());
2747     }
2748 
2749     @Override
2750     public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2751     throws IOException {
2752       return getHTableDescriptor(TableName.valueOf(tableName));
2753     }
2754   }
2755 
2756   /**
2757    * The record of errors for servers.
2758    */
2759   static class ServerErrorTracker {
2760     // We need a concurrent map here, as we could have multiple threads updating it in parallel.
2761     private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
2762         new ConcurrentHashMap<HRegionLocation, ServerErrors>();
2763     private final long canRetryUntil;
2764     private final int maxRetries;
2765     private final String startTrackingTime;
2766 
2767     public ServerErrorTracker(long timeout, int maxRetries) {
2768       this.maxRetries = maxRetries;
2769       this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
2770       this.startTrackingTime = new Date().toString();
2771     }
2772 
2773     /**
2774      * We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
2775      */
2776     boolean canRetryMore(int numRetry) {
2777       // If there is a single try we must not take into account the time.
2778       return numRetry < maxRetries || (maxRetries > 1 &&
2779           EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil);
2780     }
2781 
2782     /**
2783      * Calculates the back-off time for a retrying request to a particular server.
2784      *
2785      * @param server    The server in question.
2786      * @param basePause The default hci pause.
2787      * @return The time to wait before sending next request.
2788      */
2789     long calculateBackoffTime(HRegionLocation server, long basePause) {
2790       long result;
2791       ServerErrors errorStats = errorsByServer.get(server);
2792       if (errorStats != null) {
2793         result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2794       } else {
2795         result = 0; // yes, if the server is not in our list we don't wait before retrying.
2796       }
2797       return result;
2798     }
2799 
2800     /**
2801      * Reports that there was an error on the server to do whatever bean-counting necessary.
2802      *
2803      * @param server The server in question.
2804      */
2805     void reportServerError(HRegionLocation server) {
2806       ServerErrors errors = errorsByServer.get(server);
2807       if (errors != null) {
2808         errors.addError();
2809       } else {
2810         errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2811         if (errors != null){
2812           errors.addError();
2813         }
2814       }
2815     }
2816 
2817     String getStartTrackingTime() {
2818       return startTrackingTime;
2819     }
2820 
2821     /**
2822      * The record of errors for a server.
2823      */
2824     private static class ServerErrors {
2825       public final AtomicInteger retries = new AtomicInteger(0);
2826 
2827       public void addError() {
2828         retries.incrementAndGet();
2829       }
2830     }
2831   }
2832 
2833   /**
2834    * Look for an exception we know in the remote exception:
2835    * - hadoop.ipc wrapped exceptions
2836    * - nested exceptions
2837    *
2838    * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException
2839    * @return null if we didn't find the exception, the exception otherwise.
2840    */
2841   public static Throwable findException(Object exception) {
2842     if (exception == null || !(exception instanceof Throwable)) {
2843       return null;
2844     }
2845     Throwable cur = (Throwable) exception;
2846     while (cur != null) {
2847       if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
2848           || cur instanceof RegionTooBusyException) {
2849         return cur;
2850       }
2851       if (cur instanceof RemoteException) {
2852         RemoteException re = (RemoteException) cur;
2853         cur = re.unwrapRemoteException(
2854             RegionOpeningException.class, RegionMovedException.class,
2855             RegionTooBusyException.class);
2856         if (cur == null) {
2857           cur = re.unwrapRemoteException();
2858         }
2859         // unwrapRemoteException can return the exception given as a parameter when it cannot
2860         //  unwrap it. In this case, there is no need to look further
2861         // noinspection ObjectEquality
2862         if (cur == re) {
2863           return null;
2864         }
2865       } else {
2866         cur = cur.getCause();
2867       }
2868     }
2869 
2870     return null;
2871   }
2872 
2873   /**
2874    * Set the number of retries to use serverside when trying to communicate
2875    * with another server over {@link HConnection}.  Used updating catalog
2876    * tables, etc.  Call this method before we create any Connections.
2877    * @param c The Configuration instance to set the retries into.
2878    * @param log Used to log what we set in here.
2879    */
2880   public static void setServerSideHConnectionRetries(final Configuration c, final String sn,
2881       final Log log) {
2882     int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2883       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2884     // Go big.  Multiply by 10.  If we can't get to meta after this many retries
2885     // then something seriously wrong.
2886     int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10);
2887     int retries = hcRetries * serversideMultiplier;
2888     c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
2889     log.debug(sn + " HConnection server-to-server retries=" + retries);
2890   }
2891 }