View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.coprocessor.Batch;
33  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
35  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.PoolMap;
38  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
39  
40  import com.google.protobuf.Descriptors;
41  import com.google.protobuf.Message;
42  import com.google.protobuf.Service;
43  import com.google.protobuf.ServiceException;
44  
45  /**
46   * A simple pool of HTable instances.
47   *
48   * Each HTablePool acts as a pool for all tables. To use, instantiate an
49   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
50   *
51     * This method is not needed anymore, clients should call
52     * HTableInterface.close() rather than returning the tables to the pool
53     *
54   * Once you are done with it, close your instance of {@link HTableInterface}
55   * by calling {@link HTableInterface#close()} rather than returning the tables
56   * to the pool with (deprecated) {@link #putTable(HTableInterface)}.
57   *
58   * <p>
59   * A pool can be created with a <i>maxSize</i> which defines the most HTable
60   * references that will ever be retained for each table. Otherwise the default
61   * is {@link Integer#MAX_VALUE}.
62   *
63   * <p>
64   * Pool will manage its own connections to the cluster. See
65   * {@link HConnectionManager}.
66   * @deprecated as of 0.98.1. See {@link HConnection#getTable(String)}.
67   */
68  @InterfaceAudience.Private
69  @Deprecated
70  public class HTablePool implements Closeable {
71    private final PoolMap<String, HTableInterface> tables;
72    private final int maxSize;
73    private final PoolType poolType;
74    private final Configuration config;
75    private final HTableInterfaceFactory tableFactory;
76  
77    /**
78     * Default Constructor. Default HBaseConfiguration and no limit on pool size.
79     */
80    public HTablePool() {
81      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
82    }
83  
84    /**
85     * Constructor to set maximum versions and use the specified configuration.
86     *
87     * @param config
88     *          configuration
89     * @param maxSize
90     *          maximum number of references to keep for each table
91     */
92    public HTablePool(final Configuration config, final int maxSize) {
93      this(config, maxSize, null, null);
94    }
95  
96    /**
97     * Constructor to set maximum versions and use the specified configuration and
98     * table factory.
99     *
100    * @param config
101    *          configuration
102    * @param maxSize
103    *          maximum number of references to keep for each table
104    * @param tableFactory
105    *          table factory
106    */
107   public HTablePool(final Configuration config, final int maxSize,
108       final HTableInterfaceFactory tableFactory) {
109     this(config, maxSize, tableFactory, PoolType.Reusable);
110   }
111 
112   /**
113    * Constructor to set maximum versions and use the specified configuration and
114    * pool type.
115    *
116    * @param config
117    *          configuration
118    * @param maxSize
119    *          maximum number of references to keep for each table
120    * @param poolType
121    *          pool type which is one of {@link PoolType#Reusable} or
122    *          {@link PoolType#ThreadLocal}
123    */
124   public HTablePool(final Configuration config, final int maxSize,
125       final PoolType poolType) {
126     this(config, maxSize, null, poolType);
127   }
128 
129   /**
130    * Constructor to set maximum versions and use the specified configuration,
131    * table factory and pool type. The HTablePool supports the
132    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
133    * type is null or not one of those two values, then it will default to
134    * {@link PoolType#Reusable}.
135    *
136    * @param config
137    *          configuration
138    * @param maxSize
139    *          maximum number of references to keep for each table
140    * @param tableFactory
141    *          table factory
142    * @param poolType
143    *          pool type which is one of {@link PoolType#Reusable} or
144    *          {@link PoolType#ThreadLocal}
145    */
146   public HTablePool(final Configuration config, final int maxSize,
147       final HTableInterfaceFactory tableFactory, PoolType poolType) {
148     // Make a new configuration instance so I can safely cleanup when
149     // done with the pool.
150     this.config = config == null ? HBaseConfiguration.create() : config;
151     this.maxSize = maxSize;
152     this.tableFactory = tableFactory == null ? new HTableFactory()
153         : tableFactory;
154     if (poolType == null) {
155       this.poolType = PoolType.Reusable;
156     } else {
157       switch (poolType) {
158       case Reusable:
159       case ThreadLocal:
160         this.poolType = poolType;
161         break;
162       default:
163         this.poolType = PoolType.Reusable;
164         break;
165       }
166     }
167     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
168         this.maxSize);
169   }
170 
171   /**
172    * Get a reference to the specified table from the pool.
173    * <p>
174    * <p/>
175    *
176    * @param tableName
177    *          table name
178    * @return a reference to the specified table
179    * @throws RuntimeException
180    *           if there is a problem instantiating the HTable
181    */
182   public HTableInterface getTable(String tableName) {
183     // call the old getTable implementation renamed to findOrCreateTable
184     HTableInterface table = findOrCreateTable(tableName);
185     // return a proxy table so when user closes the proxy, the actual table
186     // will be returned to the pool
187     return new PooledHTable(table);
188   }
189 
190   /**
191    * Get a reference to the specified table from the pool.
192    * <p>
193    *
194    * Create a new one if one is not available.
195    *
196    * @param tableName
197    *          table name
198    * @return a reference to the specified table
199    * @throws RuntimeException
200    *           if there is a problem instantiating the HTable
201    */
202   private HTableInterface findOrCreateTable(String tableName) {
203     HTableInterface table = tables.get(tableName);
204     if (table == null) {
205       table = createHTable(tableName);
206     }
207     return table;
208   }
209 
210   /**
211    * Get a reference to the specified table from the pool.
212    * <p>
213    *
214    * Create a new one if one is not available.
215    *
216    * @param tableName
217    *          table name
218    * @return a reference to the specified table
219    * @throws RuntimeException
220    *           if there is a problem instantiating the HTable
221    */
222   public HTableInterface getTable(byte[] tableName) {
223     return getTable(Bytes.toString(tableName));
224   }
225 
226   /**
227    * This method is not needed anymore, clients should call
228    * HTableInterface.close() rather than returning the tables to the pool
229    *
230    * @param table
231    *          the proxy table user got from pool
232    * @deprecated
233    */
234   public void putTable(HTableInterface table) throws IOException {
235     // we need to be sure nobody puts a proxy implementation in the pool
236     // but if the client code is not updated
237     // and it will continue to call putTable() instead of calling close()
238     // then we need to return the wrapped table to the pool instead of the
239     // proxy
240     // table
241     if (table instanceof PooledHTable) {
242       returnTable(((PooledHTable) table).getWrappedTable());
243     } else {
244       // normally this should not happen if clients pass back the same
245       // table
246       // object they got from the pool
247       // but if it happens then it's better to reject it
248       throw new IllegalArgumentException("not a pooled table: " + table);
249     }
250   }
251 
252   /**
253    * Puts the specified HTable back into the pool.
254    * <p>
255    *
256    * If the pool already contains <i>maxSize</i> references to the table, then
257    * the table instance gets closed after flushing buffered edits.
258    *
259    * @param table
260    *          table
261    */
262   private void returnTable(HTableInterface table) throws IOException {
263     // this is the old putTable method renamed and made private
264     String tableName = Bytes.toString(table.getTableName());
265     if (tables.size(tableName) >= maxSize) {
266       // release table instance since we're not reusing it
267       this.tables.removeValue(tableName, table);
268       this.tableFactory.releaseHTableInterface(table);
269       return;
270     }
271     tables.put(tableName, table);
272   }
273 
274   protected HTableInterface createHTable(String tableName) {
275     return this.tableFactory.createHTableInterface(config,
276         Bytes.toBytes(tableName));
277   }
278 
279   /**
280    * Closes all the HTable instances , belonging to the given table, in the
281    * table pool.
282    * <p>
283    * Note: this is a 'shutdown' of the given table pool and different from
284    * {@link #putTable(HTableInterface)}, that is used to return the table
285    * instance to the pool for future re-use.
286    *
287    * @param tableName
288    */
289   public void closeTablePool(final String tableName) throws IOException {
290     Collection<HTableInterface> tables = this.tables.values(tableName);
291     if (tables != null) {
292       for (HTableInterface table : tables) {
293         this.tableFactory.releaseHTableInterface(table);
294       }
295     }
296     this.tables.remove(tableName);
297   }
298 
299   /**
300    * See {@link #closeTablePool(String)}.
301    *
302    * @param tableName
303    */
304   public void closeTablePool(final byte[] tableName) throws IOException {
305     closeTablePool(Bytes.toString(tableName));
306   }
307 
308   /**
309    * Closes all the HTable instances , belonging to all tables in the table
310    * pool.
311    * <p>
312    * Note: this is a 'shutdown' of all the table pools.
313    */
314   public void close() throws IOException {
315     for (String tableName : tables.keySet()) {
316       closeTablePool(tableName);
317     }
318     this.tables.clear();
319   }
320 
321   public int getCurrentPoolSize(String tableName) {
322     return tables.size(tableName);
323   }
324 
325   /**
326    * A proxy class that implements HTableInterface.close method to return the
327    * wrapped table back to the table pool
328    *
329    */
330   class PooledHTable implements HTableInterface {
331 
332     private boolean open = false;
333 
334     private HTableInterface table; // actual table implementation
335 
336     public PooledHTable(HTableInterface table) {
337       this.table = table;
338       this.open = true;
339     }
340 
341     @Override
342     public byte[] getTableName() {
343       checkState();
344       return table.getTableName();
345     }
346 
347     @Override
348     public TableName getName() {
349       return table.getName();
350     }
351 
352     @Override
353     public Configuration getConfiguration() {
354       checkState();
355       return table.getConfiguration();
356     }
357 
358     @Override
359     public HTableDescriptor getTableDescriptor() throws IOException {
360       checkState();
361       return table.getTableDescriptor();
362     }
363 
364     @Override
365     public boolean exists(Get get) throws IOException {
366       checkState();
367       return table.exists(get);
368     }
369 
370     @Override
371     public Boolean[] exists(List<Get> gets) throws IOException {
372       checkState();
373       return table.exists(gets);
374     }
375 
376     @Override
377     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
378         InterruptedException {
379       checkState();
380       table.batch(actions, results);
381     }
382 
383     /**
384      * {@inheritDoc}
385      * @deprecated If any exception is thrown by one of the actions, there is no way to
386      * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
387      */
388     @Override
389     public Object[] batch(List<? extends Row> actions) throws IOException,
390         InterruptedException {
391       checkState();
392       return table.batch(actions);
393     }
394 
395     @Override
396     public Result get(Get get) throws IOException {
397       checkState();
398       return table.get(get);
399     }
400 
401     @Override
402     public Result[] get(List<Get> gets) throws IOException {
403       checkState();
404       return table.get(gets);
405     }
406 
407     @Override
408     @SuppressWarnings("deprecation")
409     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
410       checkState();
411       return table.getRowOrBefore(row, family);
412     }
413 
414     @Override
415     public ResultScanner getScanner(Scan scan) throws IOException {
416       checkState();
417       return table.getScanner(scan);
418     }
419 
420     @Override
421     public ResultScanner getScanner(byte[] family) throws IOException {
422       checkState();
423       return table.getScanner(family);
424     }
425 
426     @Override
427     public ResultScanner getScanner(byte[] family, byte[] qualifier)
428         throws IOException {
429       checkState();
430       return table.getScanner(family, qualifier);
431     }
432 
433     @Override
434     public void put(Put put) throws IOException {
435       checkState();
436       table.put(put);
437     }
438 
439     @Override
440     public void put(List<Put> puts) throws IOException {
441       checkState();
442       table.put(puts);
443     }
444 
445     @Override
446     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
447         byte[] value, Put put) throws IOException {
448       checkState();
449       return table.checkAndPut(row, family, qualifier, value, put);
450     }
451 
452     @Override
453     public void delete(Delete delete) throws IOException {
454       checkState();
455       table.delete(delete);
456     }
457 
458     @Override
459     public void delete(List<Delete> deletes) throws IOException {
460       checkState();
461       table.delete(deletes);
462     }
463 
464     @Override
465     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
466         byte[] value, Delete delete) throws IOException {
467       checkState();
468       return table.checkAndDelete(row, family, qualifier, value, delete);
469     }
470 
471     @Override
472     public Result increment(Increment increment) throws IOException {
473       checkState();
474       return table.increment(increment);
475     }
476 
477     @Override
478     public long incrementColumnValue(byte[] row, byte[] family,
479         byte[] qualifier, long amount) throws IOException {
480       checkState();
481       return table.incrementColumnValue(row, family, qualifier, amount);
482     }
483 
484     @Override
485     public long incrementColumnValue(byte[] row, byte[] family,
486         byte[] qualifier, long amount, Durability durability) throws IOException {
487       checkState();
488       return table.incrementColumnValue(row, family, qualifier, amount,
489           durability);
490     }
491 
492     @Override
493     public boolean isAutoFlush() {
494       checkState();
495       return table.isAutoFlush();
496     }
497 
498     @Override
499     public void flushCommits() throws IOException {
500       checkState();
501       table.flushCommits();
502     }
503 
504     /**
505      * Returns the actual table back to the pool
506      *
507      * @throws IOException
508      */
509     public void close() throws IOException {
510       checkState();
511       open = false;
512       returnTable(table);
513     }
514 
515     @Override
516     public CoprocessorRpcChannel coprocessorService(byte[] row) {
517       checkState();
518       return table.coprocessorService(row);
519     }
520 
521     @Override
522     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
523         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
524         throws ServiceException, Throwable {
525       checkState();
526       return table.coprocessorService(service, startKey, endKey, callable);
527     }
528 
529     @Override
530     public <T extends Service, R> void coprocessorService(Class<T> service,
531         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
532         throws ServiceException, Throwable {
533       checkState();
534       table.coprocessorService(service, startKey, endKey, callable, callback);
535     }
536 
537     @Override
538     public String toString() {
539       return "PooledHTable{" + ", table=" + table + '}';
540     }
541 
542     /**
543      * Expose the wrapped HTable to tests in the same package
544      *
545      * @return wrapped htable
546      */
547     HTableInterface getWrappedTable() {
548       return table;
549     }
550 
551     @Override
552     public <R> void batchCallback(List<? extends Row> actions,
553         Object[] results, Callback<R> callback) throws IOException,
554         InterruptedException {
555       checkState();
556       table.batchCallback(actions, results, callback);
557     }
558 
559     /**
560      * {@inheritDoc}
561      * @deprecated If any exception is thrown by one of the actions, there is no way to
562      * retrieve the partially executed results. Use
563      * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
564      * instead.
565      */
566     @Override
567     public <R> Object[] batchCallback(List<? extends Row> actions,
568         Callback<R> callback) throws IOException, InterruptedException {
569       checkState();
570       return table.batchCallback(actions,  callback);
571     }
572 
573     @Override
574     public void mutateRow(RowMutations rm) throws IOException {
575       checkState();
576       table.mutateRow(rm);
577     }
578 
579     @Override
580     public Result append(Append append) throws IOException {
581       checkState();
582       return table.append(append);
583     }
584 
585     @Override
586     public void setAutoFlush(boolean autoFlush) {
587       checkState();
588       table.setAutoFlush(autoFlush, autoFlush);
589     }
590 
591     @Override
592     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
593       checkState();
594       table.setAutoFlush(autoFlush, clearBufferOnFail);
595     }
596 
597     @Override
598     public void setAutoFlushTo(boolean autoFlush) {
599       table.setAutoFlushTo(autoFlush);
600     }
601 
602     @Override
603     public long getWriteBufferSize() {
604       checkState();
605       return table.getWriteBufferSize();
606     }
607 
608     @Override
609     public void setWriteBufferSize(long writeBufferSize) throws IOException {
610       checkState();
611       table.setWriteBufferSize(writeBufferSize);
612     }
613 
614     boolean isOpen() {
615       return open;
616     }
617 
618     private void checkState() {
619       if (!isOpen()) {
620         throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
621       }
622     }
623 
624     @Override
625     public long incrementColumnValue(byte[] row, byte[] family,
626         byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
627       return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
628     }
629 
630     @Override
631     public <R extends Message> Map<byte[], R> batchCoprocessorService(
632         Descriptors.MethodDescriptor method, Message request,
633         byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
634       checkState();
635       return table.batchCoprocessorService(method, request, startKey, endKey,
636           responsePrototype);
637     }
638 
639     @Override
640     public <R extends Message> void batchCoprocessorService(
641         Descriptors.MethodDescriptor method, Message request,
642         byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
643         throws ServiceException, Throwable {
644       checkState();
645       table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
646     }
647 
648     @Override
649     public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
650         byte[] value, RowMutations mutation) throws IOException {
651       checkState();
652       return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
653     }
654   }
655 }