View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import com.google.protobuf.InvalidProtocolBufferException;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.hbase.classification.InterfaceAudience;
44  import org.apache.hadoop.hbase.classification.InterfaceStability;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.hbase.Cell;
47  import org.apache.hadoop.hbase.DoNotRetryIOException;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HRegionLocation;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.KeyValue;
54  import org.apache.hadoop.hbase.KeyValueUtil;
55  import org.apache.hadoop.hbase.ServerName;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.client.coprocessor.Batch;
58  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
59  import org.apache.hadoop.hbase.filter.BinaryComparator;
60  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
61  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
62  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
63  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
64  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
65  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66  import org.apache.hadoop.hbase.protobuf.RequestConverter;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
71  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
72  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.Pair;
75  import org.apache.hadoop.hbase.util.Threads;
76  
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.Message;
79  import com.google.protobuf.Service;
80  import com.google.protobuf.ServiceException;
81  
82  /**
83   * <p>Used to communicate with a single HBase table.  An implementation of
84   * {@link HTableInterface}.  Instances of this class can be constructed directly but it is
85   * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
86   * See {@link HConnectionManager} class comment for an example.
87   *
88   * <p>This class is not thread safe for reads nor write.
89   *
90   * <p>In case of writes (Put, Delete), the underlying write buffer can
91   * be corrupted if multiple threads contend over a single HTable instance.
92   *
93   * <p>In case of reads, some fields used by a Scan are shared among all threads.
94   * The HTable implementation can either not contract to be safe in case of a Get
95   *
96   * <p>Instances of HTable passed the same {@link Configuration} instance will
97   * share connections to servers out on the cluster and to the zookeeper ensemble
98   * as well as caches of region locations.  This is usually a *good* thing and it
99   * is recommended to reuse the same configuration object for all your tables.
100  * This happens because they will all share the same underlying
101  * {@link HConnection} instance. See {@link HConnectionManager} for more on
102  * how this mechanism works.
103  *
104  * <p>{@link HConnection} will read most of the
105  * configuration it needs from the passed {@link Configuration} on initial
106  * construction.  Thereafter, for settings such as
107  * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
108  * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
109  * passed {@link Configuration} subsequent to {@link HConnection} construction
110  * will go unnoticed.  To run with changed values, make a new
111  * {@link HTable} passing a new {@link Configuration} instance that has the
112  * new configuration.
113  *
114  * <p>Note that this class implements the {@link Closeable} interface. When a
115  * HTable instance is no longer required, it *should* be closed in order to ensure
116  * that the underlying resources are promptly released. Please note that the close
117  * method can throw java.io.IOException that must be handled.
118  *
119  * @see HBaseAdmin for create, drop, list, enable and disable of tables.
120  * @see HConnection
121  * @see HConnectionManager
122  */
123 @InterfaceAudience.Public
124 @InterfaceStability.Stable
125 public class HTable implements HTableInterface {
126   private static final Log LOG = LogFactory.getLog(HTable.class);
127   protected HConnection connection;
128   private final TableName tableName;
129   private volatile Configuration configuration;
130   private TableConfiguration tableConfiguration;
131   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
132   private long writeBufferSize;
133   private boolean clearBufferOnFail;
134   private boolean autoFlush;
135   protected long currentWriteBufferSize;
136   protected int scannerCaching;
137   private ExecutorService pool;  // For Multi
138   private boolean closed;
139   private int operationTimeout;
140   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
141   private final boolean cleanupConnectionOnClose; // close the connection in close()
142 
143   /** The Async process for puts with autoflush set to false or multiputs */
144   protected AsyncProcess<Object> ap;
145   private RpcRetryingCallerFactory rpcCallerFactory;
146   private RpcControllerFactory rpcControllerFactory;
147 
148   /**
149    * Creates an object to access a HBase table.
150    * Shares zookeeper connection and other resources with other HTable instances
151    * created with the same <code>conf</code> instance.  Uses already-populated
152    * region cache if one is available, populated by any other HTable instances
153    * sharing this <code>conf</code> instance.  Recommended.
154    * @param conf Configuration object to use.
155    * @param tableName Name of the table.
156    * @throws IOException if a remote or network exception occurs
157    */
158   public HTable(Configuration conf, final String tableName)
159   throws IOException {
160     this(conf, TableName.valueOf(tableName));
161   }
162 
163   /**
164    * Creates an object to access a HBase table.
165    * Shares zookeeper connection and other resources with other HTable instances
166    * created with the same <code>conf</code> instance.  Uses already-populated
167    * region cache if one is available, populated by any other HTable instances
168    * sharing this <code>conf</code> instance.  Recommended.
169    * @param conf Configuration object to use.
170    * @param tableName Name of the table.
171    * @throws IOException if a remote or network exception occurs
172    */
173   public HTable(Configuration conf, final byte[] tableName)
174   throws IOException {
175     this(conf, TableName.valueOf(tableName));
176   }
177 
178 
179 
180   /**
181    * Creates an object to access a HBase table.
182    * Shares zookeeper connection and other resources with other HTable instances
183    * created with the same <code>conf</code> instance.  Uses already-populated
184    * region cache if one is available, populated by any other HTable instances
185    * sharing this <code>conf</code> instance.  Recommended.
186    * @param conf Configuration object to use.
187    * @param tableName table name pojo
188    * @throws IOException if a remote or network exception occurs
189    */
190   public HTable(Configuration conf, final TableName tableName)
191   throws IOException {
192     this.tableName = tableName;
193     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
194     if (conf == null) {
195       this.connection = null;
196       return;
197     }
198     this.connection = HConnectionManager.getConnection(conf);
199     this.configuration = conf;
200 
201     this.pool = getDefaultExecutor(conf);
202     this.finishSetup();
203   }
204 
205   /**
206    * Creates an object to access a HBase table. Shares zookeeper connection and other resources with
207    * other HTable instances created with the same <code>connection</code> instance. Use this
208    * constructor when the HConnection instance is externally managed.
209    * @param tableName Name of the table.
210    * @param connection HConnection to be used.
211    * @throws IOException if a remote or network exception occurs
212    */
213   public HTable(TableName tableName, HConnection connection) throws IOException {
214     this.tableName = tableName;
215     this.cleanupPoolOnClose = true;
216     this.cleanupConnectionOnClose = false;
217     this.connection = connection;
218     this.configuration = connection.getConfiguration();
219 
220     this.pool = getDefaultExecutor(this.configuration);
221     this.finishSetup();
222   }
223    
224   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
225     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
226     if (maxThreads == 0) {
227       maxThreads = 1; // is there a better default?
228     }
229     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
230 
231     // Using the "direct handoff" approach, new threads will only be created
232     // if it is necessary and will grow unbounded. This could be bad but in HCM
233     // we only create as many Runnables as there are region servers. It means
234     // it also scales when new region servers are added.
235     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
236         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
237     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
238     return pool;
239   }
240 
241   /**
242    * Creates an object to access a HBase table.
243    * Shares zookeeper connection and other resources with other HTable instances
244    * created with the same <code>conf</code> instance.  Uses already-populated
245    * region cache if one is available, populated by any other HTable instances
246    * sharing this <code>conf</code> instance.
247    * Use this constructor when the ExecutorService is externally managed.
248    * @param conf Configuration object to use.
249    * @param tableName Name of the table.
250    * @param pool ExecutorService to be used.
251    * @throws IOException if a remote or network exception occurs
252    */
253   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
254       throws IOException {
255     this(conf, TableName.valueOf(tableName), pool);
256   }
257 
258   /**
259    * Creates an object to access a HBase table.
260    * Shares zookeeper connection and other resources with other HTable instances
261    * created with the same <code>conf</code> instance.  Uses already-populated
262    * region cache if one is available, populated by any other HTable instances
263    * sharing this <code>conf</code> instance.
264    * Use this constructor when the ExecutorService is externally managed.
265    * @param conf Configuration object to use.
266    * @param tableName Name of the table.
267    * @param pool ExecutorService to be used.
268    * @throws IOException if a remote or network exception occurs
269    */
270   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
271       throws IOException {
272     this.connection = HConnectionManager.getConnection(conf);
273     this.configuration = conf;
274     this.pool = pool;
275     this.tableName = tableName;
276     this.cleanupPoolOnClose = false;
277     this.cleanupConnectionOnClose = true;
278 
279     this.finishSetup();
280   }
281 
282   /**
283    * Creates an object to access a HBase table.
284    * Shares zookeeper connection and other resources with other HTable instances
285    * created with the same <code>connection</code> instance.
286    * Use this constructor when the ExecutorService and HConnection instance are
287    * externally managed.
288    * @param tableName Name of the table.
289    * @param connection HConnection to be used.
290    * @param pool ExecutorService to be used.
291    * @throws IOException if a remote or network exception occurs
292    */
293   public HTable(final byte[] tableName, final HConnection connection,
294       final ExecutorService pool) throws IOException {
295     this(TableName.valueOf(tableName), connection, pool);
296   }
297 
298   /**
299    * Creates an object to access a HBase table.
300    * Shares zookeeper connection and other resources with other HTable instances
301    * created with the same <code>connection</code> instance.
302    * Use this constructor when the ExecutorService and HConnection instance are
303    * externally managed.
304    * @param tableName Name of the table.
305    * @param connection HConnection to be used.
306    * @param pool ExecutorService to be used.
307    * @throws IOException if a remote or network exception occurs
308    */
309   public HTable(TableName tableName, final HConnection connection,
310       final ExecutorService pool) throws IOException {
311     this(tableName, connection, null, null, null, pool);
312   }
313 
314   /**
315    * Creates an object to access a HBase table.
316    * Shares zookeeper connection and other resources with other HTable instances
317    * created with the same <code>connection</code> instance.
318    * Use this constructor when the ExecutorService and HConnection instance are
319    * externally managed.
320    * @param tableName Name of the table.
321    * @param connection HConnection to be used.
322    * @param tableConfig table configuration
323    * @param rpcCallerFactory RPC caller factory
324    * @param rpcControllerFactory RPC controller factory
325    * @param pool ExecutorService to be used.
326    * @throws IOException if a remote or network exception occurs
327    */
328   public HTable(TableName tableName, final HConnection connection,
329       final TableConfiguration tableConfig,
330       final RpcRetryingCallerFactory rpcCallerFactory,
331       final RpcControllerFactory rpcControllerFactory,
332       final ExecutorService pool) throws IOException {
333     if (connection == null || connection.isClosed()) {
334       throw new IllegalArgumentException("Connection is null or closed.");
335     }
336     this.tableName = tableName;
337     this.connection = connection;
338     this.configuration = connection.getConfiguration();
339     this.tableConfiguration = tableConfig;
340     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
341     this.pool = pool;
342 
343     this.rpcCallerFactory = rpcCallerFactory;
344     this.rpcControllerFactory = rpcControllerFactory;
345 
346     this.finishSetup();
347   }
348 
349   /**
350    * For internal testing.
351    */
352   protected HTable(){
353     tableName = null;
354     tableConfiguration = new TableConfiguration();
355     cleanupPoolOnClose = false;
356     cleanupConnectionOnClose = false;
357   }
358 
359   /**
360    * @return maxKeyValueSize from configuration.
361    */
362   public static int getMaxKeyValueSize(Configuration conf) {
363     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
364   }
365 
366   /**
367    * setup this HTable's parameter based on the passed configuration
368    */
369   private void finishSetup() throws IOException {
370     if (tableConfiguration == null) {
371       tableConfiguration = new TableConfiguration(configuration);
372     }
373     this.operationTimeout = tableName.isSystemTable() ?
374       tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
375     this.writeBufferSize = tableConfiguration.getWriteBufferSize();
376     this.clearBufferOnFail = true;
377     this.autoFlush = true;
378     this.currentWriteBufferSize = 0;
379     this.scannerCaching = tableConfiguration.getScannerCaching();
380 
381     if (this.rpcCallerFactory == null) {
382       this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration,
383         this.connection.getStatisticsTracker());
384     }
385     if (this.rpcControllerFactory == null) {
386       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
387     }
388 
389     ap = new AsyncProcess<Object>(connection, tableName, pool, null, configuration,
390       rpcCallerFactory, rpcControllerFactory);
391 
392     this.closed = false;
393   }
394 
395   /**
396    * {@inheritDoc}
397    */
398   @Override
399   public Configuration getConfiguration() {
400     return configuration;
401   }
402 
403   /**
404    * Tells whether or not a table is enabled or not. This method creates a
405    * new HBase configuration, so it might make your unit tests fail due to
406    * incorrect ZK client port.
407    * @param tableName Name of table to check.
408    * @return {@code true} if table is online.
409    * @throws IOException if a remote or network exception occurs
410 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
411    */
412   @Deprecated
413   public static boolean isTableEnabled(String tableName) throws IOException {
414     return isTableEnabled(TableName.valueOf(tableName));
415   }
416 
417   /**
418    * Tells whether or not a table is enabled or not. This method creates a
419    * new HBase configuration, so it might make your unit tests fail due to
420    * incorrect ZK client port.
421    * @param tableName Name of table to check.
422    * @return {@code true} if table is online.
423    * @throws IOException if a remote or network exception occurs
424 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
425    */
426   @Deprecated
427   public static boolean isTableEnabled(byte[] tableName) throws IOException {
428     return isTableEnabled(TableName.valueOf(tableName));
429   }
430 
431   /**
432    * Tells whether or not a table is enabled or not. This method creates a
433    * new HBase configuration, so it might make your unit tests fail due to
434    * incorrect ZK client port.
435    * @param tableName Name of table to check.
436    * @return {@code true} if table is online.
437    * @throws IOException if a remote or network exception occurs
438    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
439    */
440   @Deprecated
441   public static boolean isTableEnabled(TableName tableName) throws IOException {
442     return isTableEnabled(HBaseConfiguration.create(), tableName);
443   }
444 
445   /**
446    * Tells whether or not a table is enabled or not.
447    * @param conf The Configuration object to use.
448    * @param tableName Name of table to check.
449    * @return {@code true} if table is online.
450    * @throws IOException if a remote or network exception occurs
451 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
452    */
453   @Deprecated
454   public static boolean isTableEnabled(Configuration conf, String tableName)
455   throws IOException {
456     return isTableEnabled(conf, TableName.valueOf(tableName));
457   }
458 
459   /**
460    * Tells whether or not a table is enabled or not.
461    * @param conf The Configuration object to use.
462    * @param tableName Name of table to check.
463    * @return {@code true} if table is online.
464    * @throws IOException if a remote or network exception occurs
465 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
466    */
467   @Deprecated
468   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
469   throws IOException {
470     return isTableEnabled(conf, TableName.valueOf(tableName));
471   }
472 
473   /**
474    * Tells whether or not a table is enabled or not.
475    * @param conf The Configuration object to use.
476    * @param tableName Name of table to check.
477    * @return {@code true} if table is online.
478    * @throws IOException if a remote or network exception occurs
479    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
480    */
481   @Deprecated
482   public static boolean isTableEnabled(Configuration conf,
483       final TableName tableName) throws IOException {
484     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
485       @Override
486       public Boolean connect(HConnection connection) throws IOException {
487         return connection.isTableEnabled(tableName);
488       }
489     });
490   }
491 
492   /**
493    * Find region location hosting passed row using cached info
494    * @param row Row to find.
495    * @return The location of the given row.
496    * @throws IOException if a remote or network exception occurs
497    */
498   public HRegionLocation getRegionLocation(final String row)
499   throws IOException {
500     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
501   }
502 
503   /**
504    * Finds the region on which the given row is being served. Does not reload the cache.
505    * @param row Row to find.
506    * @return Location of the row.
507    * @throws IOException if a remote or network exception occurs
508    */
509   public HRegionLocation getRegionLocation(final byte [] row)
510   throws IOException {
511     return connection.getRegionLocation(tableName, row, false);
512   }
513 
514   /**
515    * Finds the region on which the given row is being served.
516    * @param row Row to find.
517    * @param reload true to reload information or false to use cached information
518    * @return Location of the row.
519    * @throws IOException if a remote or network exception occurs
520    */
521   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
522   throws IOException {
523     return connection.getRegionLocation(tableName, row, reload);
524   }
525 
526   /**
527    * {@inheritDoc}
528    */
529   @Override
530   public byte [] getTableName() {
531     return this.tableName.getName();
532   }
533 
534   @Override
535   public TableName getName() {
536     return tableName;
537   }
538 
539   /**
540    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
541    * manipulations.
542    * @return An HConnection instance.
543    * @deprecated This method will be changed from public to package protected.
544    */
545   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
546   @Deprecated
547   public HConnection getConnection() {
548     return this.connection;
549   }
550 
551   /**
552    * Gets the number of rows that a scanner will fetch at once.
553    * <p>
554    * The default value comes from {@code hbase.client.scanner.caching}.
555    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
556    */
557   @Deprecated
558   public int getScannerCaching() {
559     return scannerCaching;
560   }
561 
562   /**
563    * Kept in 0.96 for backward compatibility
564    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
565    */
566   @Deprecated
567   public List<Row> getWriteBuffer() {
568     return writeAsyncBuffer;
569   }
570 
571   /**
572    * Sets the number of rows that a scanner will fetch at once.
573    * <p>
574    * This will override the value specified by
575    * {@code hbase.client.scanner.caching}.
576    * Increasing this value will reduce the amount of work needed each time
577    * {@code next()} is called on a scanner, at the expense of memory use
578    * (since more rows will need to be maintained in memory by the scanners).
579    * @param scannerCaching the number of rows a scanner will fetch at once.
580    * @deprecated Use {@link Scan#setCaching(int)}
581    */
582   @Deprecated
583   public void setScannerCaching(int scannerCaching) {
584     this.scannerCaching = scannerCaching;
585   }
586 
587   /**
588    * {@inheritDoc}
589    */
590   @Override
591   public HTableDescriptor getTableDescriptor() throws IOException {
592     return new UnmodifyableHTableDescriptor(
593       this.connection.getHTableDescriptor(this.tableName));
594   }
595 
596   /**
597    * Gets the starting row key for every region in the currently open table.
598    * <p>
599    * This is mainly useful for the MapReduce integration.
600    * @return Array of region starting row keys
601    * @throws IOException if a remote or network exception occurs
602    */
603   public byte [][] getStartKeys() throws IOException {
604     return getStartEndKeys().getFirst();
605   }
606 
607   /**
608    * Gets the ending row key for every region in the currently open table.
609    * <p>
610    * This is mainly useful for the MapReduce integration.
611    * @return Array of region ending row keys
612    * @throws IOException if a remote or network exception occurs
613    */
614   public byte[][] getEndKeys() throws IOException {
615     return getStartEndKeys().getSecond();
616   }
617 
618   /**
619    * Gets the starting and ending row keys for every region in the currently
620    * open table.
621    * <p>
622    * This is mainly useful for the MapReduce integration.
623    * @return Pair of arrays of region starting and ending row keys
624    * @throws IOException if a remote or network exception occurs
625    */
626   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
627     NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
628     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
629     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
630 
631     for (HRegionInfo region : regions.keySet()) {
632       startKeyList.add(region.getStartKey());
633       endKeyList.add(region.getEndKey());
634     }
635 
636     return new Pair<byte [][], byte [][]>(
637       startKeyList.toArray(new byte[startKeyList.size()][]),
638       endKeyList.toArray(new byte[endKeyList.size()][]));
639   }
640 
641   /**
642    * Gets all the regions and their address for this table.
643    * <p>
644    * This is mainly useful for the MapReduce integration.
645    * @return A map of HRegionInfo with it's server address
646    * @throws IOException if a remote or network exception occurs
647    */
648   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
649     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocation, singular, returns an HRegionLocation.
650     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName(), false);
651   }
652 
653   /**
654    * Get the corresponding regions for an arbitrary range of keys.
655    * <p>
656    * @param startKey Starting row in range, inclusive
657    * @param endKey Ending row in range, exclusive
658    * @return A list of HRegionLocations corresponding to the regions that
659    * contain the specified range
660    * @throws IOException if a remote or network exception occurs
661    */
662   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
663     final byte [] endKey) throws IOException {
664     return getRegionsInRange(startKey, endKey, false);
665   }
666 
667   /**
668    * Get the corresponding regions for an arbitrary range of keys.
669    * <p>
670    * @param startKey Starting row in range, inclusive
671    * @param endKey Ending row in range, exclusive
672    * @param reload true to reload information or false to use cached information
673    * @return A list of HRegionLocations corresponding to the regions that
674    * contain the specified range
675    * @throws IOException if a remote or network exception occurs
676    */
677   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
678       final byte [] endKey, final boolean reload) throws IOException {
679     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
680   }
681 
682   /**
683    * Get the corresponding start keys and regions for an arbitrary range of
684    * keys.
685    * <p>
686    * @param startKey Starting row in range, inclusive
687    * @param endKey Ending row in range
688    * @param includeEndKey true if endRow is inclusive, false if exclusive
689    * @return A pair of list of start keys and list of HRegionLocations that
690    *         contain the specified range
691    * @throws IOException if a remote or network exception occurs
692    */
693   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
694       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
695       throws IOException {
696     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
697   }
698 
699   /**
700    * Get the corresponding start keys and regions for an arbitrary range of
701    * keys.
702    * <p>
703    * @param startKey Starting row in range, inclusive
704    * @param endKey Ending row in range
705    * @param includeEndKey true if endRow is inclusive, false if exclusive
706    * @param reload true to reload information or false to use cached information
707    * @return A pair of list of start keys and list of HRegionLocations that
708    *         contain the specified range
709    * @throws IOException if a remote or network exception occurs
710    */
711   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
712       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
713       final boolean reload) throws IOException {
714     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
715     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
716       throw new IllegalArgumentException(
717         "Invalid range: " + Bytes.toStringBinary(startKey) +
718         " > " + Bytes.toStringBinary(endKey));
719     }
720     List<byte[]> keysInRange = new ArrayList<byte[]>();
721     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
722     byte[] currentKey = startKey;
723     do {
724       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
725       keysInRange.add(currentKey);
726       regionsInRange.add(regionLocation);
727       currentKey = regionLocation.getRegionInfo().getEndKey();
728     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
729         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
730             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
731     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
732         regionsInRange);
733   }
734 
735   /**
736    * {@inheritDoc}
737    */
738    @Override
739    public Result getRowOrBefore(final byte[] row, final byte[] family)
740    throws IOException {
741      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
742          tableName, row) {
743        public Result call() throws IOException {
744             return ProtobufUtil.getRowOrBefore(getStub(), getLocation().getRegionInfo()
745                 .getRegionName(), row, family, rpcControllerFactory.newController());
746           }
747      };
748     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
749   }
750 
751    /**
752     * {@inheritDoc}
753     */
754   @Override
755   public ResultScanner getScanner(final Scan scan) throws IOException {
756     if (scan.getBatch() > 0 && scan.isSmall()) {
757       throw new IllegalArgumentException("Small scan should not be used with batching");
758     }
759     if (scan.getCaching() <= 0) {
760       scan.setCaching(getScannerCaching());
761     }
762 
763     if (scan.isReversed()) {
764       if (scan.isSmall()) {
765         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
766             this.connection);
767       } else {
768         return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection);
769       }
770     }
771 
772     if (scan.isSmall()) {
773       return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection);
774     } else {
775       return new ClientScanner(getConfiguration(), scan, getName(), this.connection);
776     }
777   }
778 
779   /**
780    * {@inheritDoc}
781    */
782   @Override
783   public ResultScanner getScanner(byte [] family) throws IOException {
784     Scan scan = new Scan();
785     scan.addFamily(family);
786     return getScanner(scan);
787   }
788 
789   /**
790    * {@inheritDoc}
791    */
792   @Override
793   public ResultScanner getScanner(byte [] family, byte [] qualifier)
794   throws IOException {
795     Scan scan = new Scan();
796     scan.addColumn(family, qualifier);
797     return getScanner(scan);
798   }
799 
800   /**
801    * {@inheritDoc}
802    */
803   @Override
804   public Result get(final Get get) throws IOException {
805     // have to instanatiate this and set the priority here since in protobuf util we don't pass in
806     // the tablename... an unfortunate side-effect of public interfaces :-/ In 0.99+ we put all the
807     // logic back into HTable
808     final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
809     controller.setPriority(tableName);
810     RegionServerCallable<Result> callable =
811         new RegionServerCallable<Result>(this.connection, getName(), get.getRow()) {
812           public Result call() throws IOException {
813             return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get,
814               controller);
815           }
816         };
817     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
818   }
819 
820   /**
821    * {@inheritDoc}
822    */
823   @Override
824   public Result[] get(List<Get> gets) throws IOException {
825     if (gets.size() == 1) {
826       return new Result[]{get(gets.get(0))};
827     }
828     try {
829       Object [] r1 = batch((List)gets);
830 
831       // translate.
832       Result [] results = new Result[r1.length];
833       int i=0;
834       for (Object o : r1) {
835         // batch ensures if there is a failure we get an exception instead
836         results[i++] = (Result) o;
837       }
838 
839       return results;
840     } catch (InterruptedException e) {
841       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
842     }
843   }
844 
845   /**
846    * {@inheritDoc}
847    */
848   @Override
849   public void batch(final List<?extends Row> actions, final Object[] results)
850       throws InterruptedException, IOException {
851     batchCallback(actions, results, null);
852   }
853 
854   /**
855    * {@inheritDoc}
856    * @deprecated If any exception is thrown by one of the actions, there is no way to
857    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
858    */
859   @Override
860   public Object[] batch(final List<? extends Row> actions)
861      throws InterruptedException, IOException {
862     return batchCallback(actions, null);
863   }
864 
865   /**
866    * {@inheritDoc}
867    */
868   @Override
869   public <R> void batchCallback(
870       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
871       throws IOException, InterruptedException {
872     connection.processBatchCallback(actions, tableName, pool, results, callback);
873   }
874 
875   /**
876    * {@inheritDoc}
877    * @deprecated If any exception is thrown by one of the actions, there is no way to
878    * retrieve the partially executed results. Use
879    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
880    * instead.
881    */
882   @Override
883   public <R> Object[] batchCallback(
884     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
885       InterruptedException {
886     Object[] results = new Object[actions.size()];
887     batchCallback(actions, results, callback);
888     return results;
889   }
890 
891   /**
892    * {@inheritDoc}
893    */
894   @Override
895   public void delete(final Delete delete)
896   throws IOException {
897     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
898         tableName, delete.getRow()) {
899       public Boolean call() throws IOException {
900         try {
901           MutateRequest request = RequestConverter.buildMutateRequest(
902             getLocation().getRegionInfo().getRegionName(), delete);
903               PayloadCarryingRpcController controller = rpcControllerFactory.newController();
904               controller.setPriority(tableName);
905               MutateResponse response = getStub().mutate(controller, request);
906           return Boolean.valueOf(response.getProcessed());
907         } catch (ServiceException se) {
908           throw ProtobufUtil.getRemoteException(se);
909         }
910       }
911     };
912     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
913   }
914 
915   /**
916    * {@inheritDoc}
917    */
918   @Override
919   public void delete(final List<Delete> deletes)
920   throws IOException {
921     Object[] results = new Object[deletes.size()];
922     try {
923       batch(deletes, results);
924     } catch (InterruptedException e) {
925       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
926     } finally {
927       // mutate list so that it is empty for complete success, or contains only failed records
928       // results are returned in the same order as the requests in list
929       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
930       for (int i = results.length - 1; i>=0; i--) {
931         // if result is not null, it succeeded
932         if (results[i] instanceof Result) {
933           deletes.remove(i);
934         }
935       }
936     }
937   }
938 
939   /**
940    * {@inheritDoc}
941    */
942   @Override
943   public void put(final Put put)
944       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
945     doPut(put);
946     if (autoFlush) {
947       flushCommits();
948     }
949   }
950 
951   /**
952    * {@inheritDoc}
953    */
954   @Override
955   public void put(final List<Put> puts)
956       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
957     for (Put put : puts) {
958       doPut(put);
959     }
960     if (autoFlush) {
961       flushCommits();
962     }
963   }
964 
965 
966   /**
967    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
968    *  cluster.
969    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
970    * @throws InterruptedIOException if we were interrupted.
971    */
972   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
973     if (ap.hasError()){
974       writeAsyncBuffer.add(put);
975       backgroundFlushCommits(true);
976     }
977 
978     validatePut(put);
979 
980     currentWriteBufferSize += put.heapSize();
981     writeAsyncBuffer.add(put);
982 
983     while (currentWriteBufferSize > writeBufferSize) {
984       backgroundFlushCommits(false);
985     }
986   }
987 
988 
989   /**
990    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
991    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
992    * send all operations in the buffer and sends an exception.
993    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
994    *                     returning.
995    */
996   private void backgroundFlushCommits(boolean synchronous) throws
997       InterruptedIOException, RetriesExhaustedWithDetailsException {
998 
999     try {
1000       do {
1001         ap.submit(writeAsyncBuffer, true);
1002       } while (synchronous && !writeAsyncBuffer.isEmpty());
1003 
1004       if (synchronous) {
1005         ap.waitUntilDone();
1006       }
1007 
1008       if (ap.hasError()) {
1009         LOG.debug(tableName + ": One or more of the operations have failed -" +
1010             " waiting for all operation in progress to finish (successfully or not)");
1011         while (!writeAsyncBuffer.isEmpty()) {
1012           ap.submit(writeAsyncBuffer, true);
1013         }
1014         ap.waitUntilDone();
1015 
1016         if (!clearBufferOnFail) {
1017           // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
1018           //  write buffer. This is a questionable feature kept here for backward compatibility
1019           writeAsyncBuffer.addAll(ap.getFailedOperations());
1020         }
1021         RetriesExhaustedWithDetailsException e = ap.getErrors();
1022         ap.clearErrors();
1023         throw e;
1024       }
1025     } finally {
1026       currentWriteBufferSize = 0;
1027       for (Row mut : writeAsyncBuffer) {
1028         if (mut instanceof Mutation) {
1029           currentWriteBufferSize += ((Mutation) mut).heapSize();
1030         }
1031       }
1032     }
1033   }
1034 
1035   /**
1036    * {@inheritDoc}
1037    */
1038   @Override
1039   public void mutateRow(final RowMutations rm) throws IOException {
1040     RegionServerCallable<Void> callable =
1041         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1042       public Void call() throws IOException {
1043         try {
1044           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1045             getLocation().getRegionInfo().getRegionName(), rm);
1046           regionMutationBuilder.setAtomic(true);
1047           MultiRequest request =
1048             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1049           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1050           controller.setPriority(tableName);
1051           ClientProtos.MultiResponse response = getStub().multi(controller, request);
1052           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1053           if (res.hasException()) {
1054             Throwable ex = ProtobufUtil.toException(res.getException());
1055             if(ex instanceof IOException) {
1056               throw (IOException)ex;
1057             }
1058             throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1059           }
1060         } catch (ServiceException se) {
1061           throw ProtobufUtil.getRemoteException(se);
1062         }
1063         return null;
1064       }
1065     };
1066     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1067   }
1068 
1069   /**
1070    * {@inheritDoc}
1071    */
1072   @Override
1073   public Result append(final Append append) throws IOException {
1074     if (append.numFamilies() == 0) {
1075       throw new IOException(
1076           "Invalid arguments to append, no columns specified");
1077     }
1078 
1079     NonceGenerator ng = this.connection.getNonceGenerator();
1080     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1081     RegionServerCallable<Result> callable =
1082       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1083         public Result call() throws IOException {
1084           try {
1085             MutateRequest request = RequestConverter.buildMutateRequest(
1086               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1087             PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1088             rpcController.setPriority(getTableName());
1089             MutateResponse response = getStub().mutate(rpcController, request);
1090             if (!response.hasResult()) return null;
1091             return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1092           } catch (ServiceException se) {
1093             throw ProtobufUtil.getRemoteException(se);
1094           }
1095         }
1096       };
1097     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1098   }
1099 
1100   /**
1101    * {@inheritDoc}
1102    */
1103   @Override
1104   public Result increment(final Increment increment) throws IOException {
1105     if (!increment.hasFamilies()) {
1106       throw new IOException(
1107           "Invalid arguments to increment, no columns specified");
1108     }
1109     NonceGenerator ng = this.connection.getNonceGenerator();
1110     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1111     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1112         getName(), increment.getRow()) {
1113       public Result call() throws IOException {
1114         try {
1115           MutateRequest request = RequestConverter.buildMutateRequest(
1116             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1117           PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1118           rpcController.setPriority(getTableName());
1119           MutateResponse response = getStub().mutate(rpcController, request);
1120           return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1121         } catch (ServiceException se) {
1122           throw ProtobufUtil.getRemoteException(se);
1123         }
1124       }
1125     };
1126     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1127   }
1128 
1129   /**
1130    * {@inheritDoc}
1131    */
1132   @Override
1133   public long incrementColumnValue(final byte [] row, final byte [] family,
1134       final byte [] qualifier, final long amount)
1135   throws IOException {
1136     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1137   }
1138 
1139   /**
1140    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1141    */
1142   @Deprecated
1143   @Override
1144   public long incrementColumnValue(final byte [] row, final byte [] family,
1145       final byte [] qualifier, final long amount, final boolean writeToWAL)
1146   throws IOException {
1147     return incrementColumnValue(row, family, qualifier, amount,
1148       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1149   }
1150 
1151   /**
1152    * {@inheritDoc}
1153    */
1154   @Override
1155   public long incrementColumnValue(final byte [] row, final byte [] family,
1156       final byte [] qualifier, final long amount, final Durability durability)
1157   throws IOException {
1158     NullPointerException npe = null;
1159     if (row == null) {
1160       npe = new NullPointerException("row is null");
1161     } else if (family == null) {
1162       npe = new NullPointerException("family is null");
1163     } else if (qualifier == null) {
1164       npe = new NullPointerException("qualifier is null");
1165     }
1166     if (npe != null) {
1167       throw new IOException(
1168           "Invalid arguments to incrementColumnValue", npe);
1169     }
1170 
1171     NonceGenerator ng = this.connection.getNonceGenerator();
1172     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1173     RegionServerCallable<Long> callable =
1174       new RegionServerCallable<Long>(connection, getName(), row) {
1175         public Long call() throws IOException {
1176           try {
1177             MutateRequest request = RequestConverter.buildIncrementRequest(
1178               getLocation().getRegionInfo().getRegionName(), row, family,
1179               qualifier, amount, durability, nonceGroup, nonce);
1180             PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1181             rpcController.setPriority(getTableName());
1182             MutateResponse response = getStub().mutate(rpcController, request);
1183             Result result =
1184               ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1185             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1186           } catch (ServiceException se) {
1187             throw ProtobufUtil.getRemoteException(se);
1188           }
1189         }
1190       };
1191     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1192   }
1193 
1194   /**
1195    * {@inheritDoc}
1196    */
1197   @Override
1198   public boolean checkAndPut(final byte [] row,
1199       final byte [] family, final byte [] qualifier, final byte [] value,
1200       final Put put)
1201   throws IOException {
1202     RegionServerCallable<Boolean> callable =
1203       new RegionServerCallable<Boolean>(connection, getName(), row) {
1204         public Boolean call() throws IOException {
1205           try {
1206             MutateRequest request = RequestConverter.buildMutateRequest(
1207               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1208                 new BinaryComparator(value), CompareType.EQUAL, put);
1209             PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1210             rpcController.setPriority(getTableName());
1211             MutateResponse response = getStub().mutate(rpcController, request);
1212             return Boolean.valueOf(response.getProcessed());
1213           } catch (ServiceException se) {
1214             throw ProtobufUtil.getRemoteException(se);
1215           }
1216         }
1217       };
1218     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1219   }
1220 
1221 
1222   /**
1223    * {@inheritDoc}
1224    */
1225   @Override
1226   public boolean checkAndDelete(final byte [] row,
1227       final byte [] family, final byte [] qualifier, final byte [] value,
1228       final Delete delete)
1229   throws IOException {
1230     RegionServerCallable<Boolean> callable =
1231       new RegionServerCallable<Boolean>(connection, getName(), row) {
1232         public Boolean call() throws IOException {
1233           try {
1234             MutateRequest request = RequestConverter.buildMutateRequest(
1235               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1236                 new BinaryComparator(value), CompareType.EQUAL, delete);
1237             PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1238             rpcController.setPriority(getTableName());
1239             MutateResponse response = getStub().mutate(rpcController, request);
1240             return Boolean.valueOf(response.getProcessed());
1241           } catch (ServiceException se) {
1242             throw ProtobufUtil.getRemoteException(se);
1243           }
1244         }
1245       };
1246     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1247   }
1248 
1249   /**
1250    * {@inheritDoc}
1251    */
1252   @Override
1253   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1254       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1255   throws IOException {
1256     RegionServerCallable<Boolean> callable =
1257         new RegionServerCallable<Boolean>(connection, getName(), row) {
1258           @Override
1259           public Boolean call() throws IOException {
1260             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1261             controller.setPriority(tableName);
1262             try {
1263               CompareType compareType = CompareType.valueOf(compareOp.name());
1264               MultiRequest request = RequestConverter.buildMutateRequest(
1265                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1266                   new BinaryComparator(value), compareType, rm);
1267               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1268               ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1269               if (res.hasException()) {
1270                 Throwable ex = ProtobufUtil.toException(res.getException());
1271                 if(ex instanceof IOException) {
1272                   throw (IOException)ex;
1273                 }
1274                 throw new IOException("Failed to checkAndMutate row: "+
1275                     Bytes.toStringBinary(rm.getRow()), ex);
1276               }
1277               return Boolean.valueOf(response.getProcessed());
1278             } catch (ServiceException se) {
1279               throw ProtobufUtil.getRemoteException(se);
1280             }
1281           }
1282         };
1283     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1284   }
1285 
1286   /**
1287    * {@inheritDoc}
1288    */
1289   @Override
1290   public boolean exists(final Get get) throws IOException {
1291     get.setCheckExistenceOnly(true);
1292     Result r = get(get);
1293     assert r.getExists() != null;
1294     return r.getExists();
1295   }
1296 
1297   /**
1298    * {@inheritDoc}
1299    */
1300   @Override
1301   public Boolean[] exists(final List<Get> gets) throws IOException {
1302     if (gets.isEmpty()) return new Boolean[]{};
1303     if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
1304 
1305     for (Get g: gets){
1306       g.setCheckExistenceOnly(true);
1307     }
1308 
1309     Object[] r1;
1310     try {
1311       r1 = batch(gets);
1312     } catch (InterruptedException e) {
1313       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1314     }
1315 
1316     // translate.
1317     Boolean[] results = new Boolean[r1.length];
1318     int i = 0;
1319     for (Object o : r1) {
1320       // batch ensures if there is a failure we get an exception instead
1321       results[i++] = ((Result)o).getExists();
1322     }
1323 
1324     return results;
1325   }
1326 
1327   /**
1328    * {@inheritDoc}
1329    */
1330   @Override
1331   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1332     // As we can have an operation in progress even if the buffer is empty, we call
1333     //  backgroundFlushCommits at least one time.
1334     backgroundFlushCommits(true);
1335   }
1336 
1337   /**
1338    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1339    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1340    *
1341    * @param list The collection of actions.
1342    * @param results An empty array, same size as list. If an exception is thrown,
1343    * you can test here for partial results, and to determine which actions
1344    * processed successfully.
1345    * @throws IOException if there are problems talking to META. Per-item
1346    * exceptions are stored in the results array.
1347    */
1348   public <R> void processBatchCallback(
1349     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1350     throws IOException, InterruptedException {
1351     this.batchCallback(list, results, callback);
1352   }
1353 
1354 
1355   /**
1356    * Parameterized batch processing, allowing varying return types for different
1357    * {@link Row} implementations.
1358    */
1359   public void processBatch(final List<? extends Row> list, final Object[] results)
1360     throws IOException, InterruptedException {
1361 
1362     this.processBatchCallback(list, results, null);
1363   }
1364 
1365 
1366   @Override
1367   public void close() throws IOException {
1368     if (this.closed) {
1369       return;
1370     }
1371     flushCommits();
1372     if (cleanupPoolOnClose) {
1373       this.pool.shutdown();
1374     }
1375     if (cleanupConnectionOnClose) {
1376       if (this.connection != null) {
1377         this.connection.close();
1378       }
1379     }
1380     this.closed = true;
1381   }
1382 
1383   // validate for well-formedness
1384   public void validatePut(final Put put) throws IllegalArgumentException {
1385     validatePut(put, tableConfiguration.getMaxKeyValueSize());
1386   }
1387 
1388   // validate for well-formedness
1389   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1390     if (put.isEmpty()) {
1391       throw new IllegalArgumentException("No columns to insert");
1392     }
1393     if (maxKeyValueSize > 0) {
1394       for (List<Cell> list : put.getFamilyCellMap().values()) {
1395         for (Cell cell : list) {
1396           // KeyValue v1 expectation.  Cast for now.
1397           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1398           if (kv.getLength() > maxKeyValueSize) {
1399             throw new IllegalArgumentException("KeyValue size too large");
1400           }
1401         }
1402       }
1403     }
1404   }
1405 
1406   /**
1407    * {@inheritDoc}
1408    */
1409   @Override
1410   public boolean isAutoFlush() {
1411     return autoFlush;
1412   }
1413 
1414   /**
1415    * {@inheritDoc}
1416    */
1417   @Deprecated
1418   @Override
1419   public void setAutoFlush(boolean autoFlush) {
1420     setAutoFlush(autoFlush, autoFlush);
1421   }
1422 
1423   /**
1424    * {@inheritDoc}
1425    */
1426   @Override
1427   public void setAutoFlushTo(boolean autoFlush) {
1428     setAutoFlush(autoFlush, clearBufferOnFail);
1429   }
1430 
1431   /**
1432    * {@inheritDoc}
1433    */
1434   @Override
1435   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1436     this.autoFlush = autoFlush;
1437     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1438   }
1439 
1440   /**
1441    * Returns the maximum size in bytes of the write buffer for this HTable.
1442    * <p>
1443    * The default value comes from the configuration parameter
1444    * {@code hbase.client.write.buffer}.
1445    * @return The size of the write buffer in bytes.
1446    */
1447   @Override
1448   public long getWriteBufferSize() {
1449     return writeBufferSize;
1450   }
1451 
1452   /**
1453    * Sets the size of the buffer in bytes.
1454    * <p>
1455    * If the new size is less than the current amount of data in the
1456    * write buffer, the buffer gets flushed.
1457    * @param writeBufferSize The new write buffer size, in bytes.
1458    * @throws IOException if a remote or network exception occurs.
1459    */
1460   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1461     this.writeBufferSize = writeBufferSize;
1462     if(currentWriteBufferSize > writeBufferSize) {
1463       flushCommits();
1464     }
1465   }
1466 
1467   /**
1468    * The pool is used for mutli requests for this HTable
1469    * @return the pool used for mutli
1470    */
1471   ExecutorService getPool() {
1472     return this.pool;
1473   }
1474 
1475   /**
1476    * Enable or disable region cache prefetch for the table. It will be
1477    * applied for the given table's all HTable instances who share the same
1478    * connection. By default, the cache prefetch is enabled.
1479    * @param tableName name of table to configure.
1480    * @param enable Set to true to enable region cache prefetch. Or set to
1481    * false to disable it.
1482    * @throws IOException
1483    */
1484   public static void setRegionCachePrefetch(final byte[] tableName,
1485       final boolean enable) throws IOException {
1486     setRegionCachePrefetch(TableName.valueOf(tableName), enable);
1487   }
1488 
1489   public static void setRegionCachePrefetch(
1490       final TableName tableName,
1491       final boolean enable) throws IOException {
1492     HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration.create()) {
1493       @Override
1494       public Void connect(HConnection connection) throws IOException {
1495         connection.setRegionCachePrefetch(tableName, enable);
1496         return null;
1497       }
1498     });
1499   }
1500 
1501   /**
1502    * Enable or disable region cache prefetch for the table. It will be
1503    * applied for the given table's all HTable instances who share the same
1504    * connection. By default, the cache prefetch is enabled.
1505    * @param conf The Configuration object to use.
1506    * @param tableName name of table to configure.
1507    * @param enable Set to true to enable region cache prefetch. Or set to
1508    * false to disable it.
1509    * @throws IOException
1510    */
1511   public static void setRegionCachePrefetch(final Configuration conf,
1512       final byte[] tableName, final boolean enable) throws IOException {
1513     setRegionCachePrefetch(conf, TableName.valueOf(tableName), enable);
1514   }
1515 
1516   public static void setRegionCachePrefetch(final Configuration conf,
1517       final TableName tableName,
1518       final boolean enable) throws IOException {
1519     HConnectionManager.execute(new HConnectable<Void>(conf) {
1520       @Override
1521       public Void connect(HConnection connection) throws IOException {
1522         connection.setRegionCachePrefetch(tableName, enable);
1523         return null;
1524       }
1525     });
1526   }
1527 
1528   /**
1529    * Check whether region cache prefetch is enabled or not for the table.
1530    * @param conf The Configuration object to use.
1531    * @param tableName name of table to check
1532    * @return true if table's region cache prefecth is enabled. Otherwise
1533    * it is disabled.
1534    * @throws IOException
1535    */
1536   public static boolean getRegionCachePrefetch(final Configuration conf,
1537       final byte[] tableName) throws IOException {
1538     return getRegionCachePrefetch(conf, TableName.valueOf(tableName));
1539   }
1540 
1541   public static boolean getRegionCachePrefetch(final Configuration conf,
1542       final TableName tableName) throws IOException {
1543     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1544       @Override
1545       public Boolean connect(HConnection connection) throws IOException {
1546         return connection.getRegionCachePrefetch(tableName);
1547       }
1548     });
1549   }
1550 
1551   /**
1552    * Check whether region cache prefetch is enabled or not for the table.
1553    * @param tableName name of table to check
1554    * @return true if table's region cache prefecth is enabled. Otherwise
1555    * it is disabled.
1556    * @throws IOException
1557    */
1558   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1559     return getRegionCachePrefetch(TableName.valueOf(tableName));
1560   }
1561 
1562   public static boolean getRegionCachePrefetch(
1563       final TableName tableName) throws IOException {
1564     return HConnectionManager.execute(new HConnectable<Boolean>(
1565         HBaseConfiguration.create()) {
1566       @Override
1567       public Boolean connect(HConnection connection) throws IOException {
1568         return connection.getRegionCachePrefetch(tableName);
1569       }
1570     });
1571   }
1572 
1573   /**
1574    * Explicitly clears the region cache to fetch the latest value from META.
1575    * This is a power user function: avoid unless you know the ramifications.
1576    */
1577   public void clearRegionCache() {
1578     this.connection.clearRegionCache();
1579   }
1580 
1581   /**
1582    * {@inheritDoc}
1583    */
1584   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1585     return new RegionCoprocessorRpcChannel(connection, tableName, row, rpcCallerFactory,
1586         rpcControllerFactory);
1587   }
1588 
1589   /**
1590    * {@inheritDoc}
1591    */
1592   @Override
1593   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1594       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1595       throws ServiceException, Throwable {
1596     final Map<byte[],R> results =  Collections.synchronizedMap(
1597         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1598     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1599       public void update(byte[] region, byte[] row, R value) {
1600         if (region != null) {
1601           results.put(region, value);
1602         }
1603       }
1604     });
1605     return results;
1606   }
1607 
1608   /**
1609    * {@inheritDoc}
1610    */
1611   @Override
1612   public <T extends Service, R> void coprocessorService(final Class<T> service,
1613       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1614       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1615 
1616     // get regions covered by the row range
1617     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1618 
1619     Map<byte[],Future<R>> futures =
1620         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1621     for (final byte[] r : keys) {
1622       final RegionCoprocessorRpcChannel channel =
1623           new RegionCoprocessorRpcChannel(connection, tableName, r, rpcCallerFactory,
1624               rpcControllerFactory);
1625       Future<R> future = pool.submit(
1626           new Callable<R>() {
1627             public R call() throws Exception {
1628               T instance = ProtobufUtil.newServiceStub(service, channel);
1629               R result = callable.call(instance);
1630               byte[] region = channel.getLastRegion();
1631               if (callback != null) {
1632                 callback.update(region, r, result);
1633               }
1634               return result;
1635             }
1636           });
1637       futures.put(r, future);
1638     }
1639     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1640       try {
1641         e.getValue().get();
1642       } catch (ExecutionException ee) {
1643         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1644             + Bytes.toStringBinary(e.getKey()), ee);
1645         throw ee.getCause();
1646       } catch (InterruptedException ie) {
1647         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1648             + " for row " + Bytes.toStringBinary(e.getKey()))
1649             .initCause(ie);
1650       }
1651     }
1652   }
1653 
1654   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1655   throws IOException {
1656     if (start == null) {
1657       start = HConstants.EMPTY_START_ROW;
1658     }
1659     if (end == null) {
1660       end = HConstants.EMPTY_END_ROW;
1661     }
1662     return getKeysAndRegionsInRange(start, end, true).getFirst();
1663   }
1664 
1665   public void setOperationTimeout(int operationTimeout) {
1666     this.operationTimeout = operationTimeout;
1667   }
1668 
1669   public int getOperationTimeout() {
1670     return operationTimeout;
1671   }
1672 
1673   @Override
1674   public String toString() {
1675     return tableName + ";" + connection;
1676   }
1677 
1678   /**
1679    * Run basic test.
1680    * @param args Pass table name and row and will get the content.
1681    * @throws IOException
1682    */
1683   public static void main(String[] args) throws IOException {
1684     HTable t = new HTable(HBaseConfiguration.create(), args[0]);
1685     try {
1686       System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1687     } finally {
1688       t.close();
1689     }
1690   }
1691 
1692   /**
1693    * {@inheritDoc}
1694    */
1695   @Override
1696   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1697       Descriptors.MethodDescriptor methodDescriptor, Message request,
1698       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1699     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1700         Bytes.BYTES_COMPARATOR));
1701     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1702         new Callback<R>() {
1703 
1704           @Override
1705           public void update(byte[] region, byte[] row, R result) {
1706             if (region != null) {
1707               results.put(region, result);
1708             }
1709           }
1710         });
1711     return results;
1712   }
1713 
1714   /**
1715    * {@inheritDoc}
1716    */
1717   @Override
1718   public <R extends Message> void batchCoprocessorService(
1719       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1720       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1721       throws ServiceException, Throwable {
1722 
1723     // get regions covered by the row range
1724     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1725         getKeysAndRegionsInRange(startKey, endKey, true);
1726     List<byte[]> keys = keysAndRegions.getFirst();
1727     List<HRegionLocation> regions = keysAndRegions.getSecond();
1728 
1729     // check if we have any calls to make
1730     if (keys.isEmpty()) {
1731       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1732           ", end=" + Bytes.toStringBinary(endKey));
1733       return;
1734     }
1735 
1736     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1737     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1738         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1739     for (int i = 0; i < keys.size(); i++) {
1740       final byte[] rowKey = keys.get(i);
1741       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1742       RegionCoprocessorServiceExec exec =
1743           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1744       execs.add(exec);
1745       execsByRow.put(rowKey, exec);
1746     }
1747 
1748     // tracking for any possible deserialization errors on success callback
1749     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1750     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1751     final List<Row> callbackErrorActions = new ArrayList<Row>();
1752     final List<String> callbackErrorServers = new ArrayList<String>();
1753 
1754     AsyncProcess<ClientProtos.CoprocessorServiceResult> asyncProcess =
1755         new AsyncProcess<ClientProtos.CoprocessorServiceResult>(connection, tableName, pool,
1756             new AsyncProcess.AsyncProcessCallback<ClientProtos.CoprocessorServiceResult>() {
1757           @SuppressWarnings("unchecked")
1758           @Override
1759           public void success(int originalIndex, byte[] region, Row row,
1760               ClientProtos.CoprocessorServiceResult serviceResult) {
1761             if (LOG.isTraceEnabled()) {
1762               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1763                 " call #" + originalIndex + ": region=" + Bytes.toStringBinary(region) +
1764                 ", row=" + Bytes.toStringBinary(row.getRow()) +
1765                 ", value=" + serviceResult.getValue().getValue());
1766             }
1767             try {
1768               callback.update(region, row.getRow(),
1769                 (R) responsePrototype.newBuilderForType().mergeFrom(
1770                   serviceResult.getValue().getValue()).build());
1771             } catch (InvalidProtocolBufferException e) {
1772               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1773                 e);
1774               callbackErrorExceptions.add(e);
1775               callbackErrorActions.add(row);
1776               callbackErrorServers.add("null");
1777             }
1778           }
1779 
1780           @Override
1781           public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) {
1782             RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
1783             LOG.error("Failed calling endpoint " + methodDescriptor.getFullName() + ": region="
1784                 + Bytes.toStringBinary(exec.getRegion()), t);
1785             return true;
1786           }
1787 
1788           @Override
1789           public boolean retriableFailure(int originalIndex, Row row, byte[] region,
1790               Throwable exception) {
1791             RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
1792             LOG.error("Failed calling endpoint " + methodDescriptor.getFullName() + ": region="
1793                 + Bytes.toStringBinary(exec.getRegion()), exception);
1794             return !(exception instanceof DoNotRetryIOException);
1795           }
1796         },
1797         configuration, rpcCallerFactory, rpcControllerFactory);
1798 
1799     asyncProcess.submitAll(execs);
1800     asyncProcess.waitUntilDone();
1801 
1802     if (asyncProcess.hasError()) {
1803       throw asyncProcess.getErrors();
1804     } else if (!callbackErrorExceptions.isEmpty()) {
1805       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1806         callbackErrorServers);
1807     }
1808   }
1809 }