1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HTableDescriptor;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.coprocessor.Batch;
33 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
35 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.PoolMap;
38 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
39
40 import com.google.protobuf.Descriptors;
41 import com.google.protobuf.Message;
42 import com.google.protobuf.Service;
43 import com.google.protobuf.ServiceException;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 @InterfaceAudience.Private
69 @Deprecated
70 public class HTablePool implements Closeable {
71 private final PoolMap<String, HTableInterface> tables;
72 private final int maxSize;
73 private final PoolType poolType;
74 private final Configuration config;
75 private final HTableInterfaceFactory tableFactory;
76
77
78
79
80 public HTablePool() {
81 this(HBaseConfiguration.create(), Integer.MAX_VALUE);
82 }
83
84
85
86
87
88
89
90
91
92 public HTablePool(final Configuration config, final int maxSize) {
93 this(config, maxSize, null, null);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107 public HTablePool(final Configuration config, final int maxSize,
108 final HTableInterfaceFactory tableFactory) {
109 this(config, maxSize, tableFactory, PoolType.Reusable);
110 }
111
112
113
114
115
116
117
118
119
120
121
122
123
124 public HTablePool(final Configuration config, final int maxSize,
125 final PoolType poolType) {
126 this(config, maxSize, null, poolType);
127 }
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 public HTablePool(final Configuration config, final int maxSize,
147 final HTableInterfaceFactory tableFactory, PoolType poolType) {
148
149
150 this.config = config == null ? HBaseConfiguration.create() : config;
151 this.maxSize = maxSize;
152 this.tableFactory = tableFactory == null ? new HTableFactory()
153 : tableFactory;
154 if (poolType == null) {
155 this.poolType = PoolType.Reusable;
156 } else {
157 switch (poolType) {
158 case Reusable:
159 case ThreadLocal:
160 this.poolType = poolType;
161 break;
162 default:
163 this.poolType = PoolType.Reusable;
164 break;
165 }
166 }
167 this.tables = new PoolMap<String, HTableInterface>(this.poolType,
168 this.maxSize);
169 }
170
171
172
173
174
175
176
177
178
179
180
181
182 public HTableInterface getTable(String tableName) {
183
184 HTableInterface table = findOrCreateTable(tableName);
185
186
187 return new PooledHTable(table);
188 }
189
190
191
192
193
194
195
196
197
198
199
200
201
202 private HTableInterface findOrCreateTable(String tableName) {
203 HTableInterface table = tables.get(tableName);
204 if (table == null) {
205 table = createHTable(tableName);
206 }
207 return table;
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221
222 public HTableInterface getTable(byte[] tableName) {
223 return getTable(Bytes.toString(tableName));
224 }
225
226
227
228
229
230
231
232
233
234 public void putTable(HTableInterface table) throws IOException {
235
236
237
238
239
240
241 if (table instanceof PooledHTable) {
242 returnTable(((PooledHTable) table).getWrappedTable());
243 } else {
244
245
246
247
248 throw new IllegalArgumentException("not a pooled table: " + table);
249 }
250 }
251
252
253
254
255
256
257
258
259
260
261
262 private void returnTable(HTableInterface table) throws IOException {
263
264 String tableName = Bytes.toString(table.getTableName());
265 if (tables.size(tableName) >= maxSize) {
266
267 this.tables.removeValue(tableName, table);
268 this.tableFactory.releaseHTableInterface(table);
269 return;
270 }
271 tables.put(tableName, table);
272 }
273
274 protected HTableInterface createHTable(String tableName) {
275 return this.tableFactory.createHTableInterface(config,
276 Bytes.toBytes(tableName));
277 }
278
279
280
281
282
283
284
285
286
287
288
289 public void closeTablePool(final String tableName) throws IOException {
290 Collection<HTableInterface> tables = this.tables.values(tableName);
291 if (tables != null) {
292 for (HTableInterface table : tables) {
293 this.tableFactory.releaseHTableInterface(table);
294 }
295 }
296 this.tables.remove(tableName);
297 }
298
299
300
301
302
303
304 public void closeTablePool(final byte[] tableName) throws IOException {
305 closeTablePool(Bytes.toString(tableName));
306 }
307
308
309
310
311
312
313
314 public void close() throws IOException {
315 for (String tableName : tables.keySet()) {
316 closeTablePool(tableName);
317 }
318 this.tables.clear();
319 }
320
321 public int getCurrentPoolSize(String tableName) {
322 return tables.size(tableName);
323 }
324
325
326
327
328
329
330 class PooledHTable implements HTableInterface {
331
332 private boolean open = false;
333
334 private HTableInterface table;
335
336 public PooledHTable(HTableInterface table) {
337 this.table = table;
338 this.open = true;
339 }
340
341 @Override
342 public byte[] getTableName() {
343 checkState();
344 return table.getTableName();
345 }
346
347 @Override
348 public TableName getName() {
349 return table.getName();
350 }
351
352 @Override
353 public Configuration getConfiguration() {
354 checkState();
355 return table.getConfiguration();
356 }
357
358 @Override
359 public HTableDescriptor getTableDescriptor() throws IOException {
360 checkState();
361 return table.getTableDescriptor();
362 }
363
364 @Override
365 public boolean exists(Get get) throws IOException {
366 checkState();
367 return table.exists(get);
368 }
369
370 @Override
371 public Boolean[] exists(List<Get> gets) throws IOException {
372 checkState();
373 return table.exists(gets);
374 }
375
376 @Override
377 public void batch(List<? extends Row> actions, Object[] results) throws IOException,
378 InterruptedException {
379 checkState();
380 table.batch(actions, results);
381 }
382
383
384
385
386
387
388 @Override
389 public Object[] batch(List<? extends Row> actions) throws IOException,
390 InterruptedException {
391 checkState();
392 return table.batch(actions);
393 }
394
395 @Override
396 public Result get(Get get) throws IOException {
397 checkState();
398 return table.get(get);
399 }
400
401 @Override
402 public Result[] get(List<Get> gets) throws IOException {
403 checkState();
404 return table.get(gets);
405 }
406
407 @Override
408 @SuppressWarnings("deprecation")
409 public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
410 checkState();
411 return table.getRowOrBefore(row, family);
412 }
413
414 @Override
415 public ResultScanner getScanner(Scan scan) throws IOException {
416 checkState();
417 return table.getScanner(scan);
418 }
419
420 @Override
421 public ResultScanner getScanner(byte[] family) throws IOException {
422 checkState();
423 return table.getScanner(family);
424 }
425
426 @Override
427 public ResultScanner getScanner(byte[] family, byte[] qualifier)
428 throws IOException {
429 checkState();
430 return table.getScanner(family, qualifier);
431 }
432
433 @Override
434 public void put(Put put) throws IOException {
435 checkState();
436 table.put(put);
437 }
438
439 @Override
440 public void put(List<Put> puts) throws IOException {
441 checkState();
442 table.put(puts);
443 }
444
445 @Override
446 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
447 byte[] value, Put put) throws IOException {
448 checkState();
449 return table.checkAndPut(row, family, qualifier, value, put);
450 }
451
452 @Override
453 public void delete(Delete delete) throws IOException {
454 checkState();
455 table.delete(delete);
456 }
457
458 @Override
459 public void delete(List<Delete> deletes) throws IOException {
460 checkState();
461 table.delete(deletes);
462 }
463
464 @Override
465 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
466 byte[] value, Delete delete) throws IOException {
467 checkState();
468 return table.checkAndDelete(row, family, qualifier, value, delete);
469 }
470
471 @Override
472 public Result increment(Increment increment) throws IOException {
473 checkState();
474 return table.increment(increment);
475 }
476
477 @Override
478 public long incrementColumnValue(byte[] row, byte[] family,
479 byte[] qualifier, long amount) throws IOException {
480 checkState();
481 return table.incrementColumnValue(row, family, qualifier, amount);
482 }
483
484 @Override
485 public long incrementColumnValue(byte[] row, byte[] family,
486 byte[] qualifier, long amount, Durability durability) throws IOException {
487 checkState();
488 return table.incrementColumnValue(row, family, qualifier, amount,
489 durability);
490 }
491
492 @Override
493 public boolean isAutoFlush() {
494 checkState();
495 return table.isAutoFlush();
496 }
497
498 @Override
499 public void flushCommits() throws IOException {
500 checkState();
501 table.flushCommits();
502 }
503
504
505
506
507
508
509 public void close() throws IOException {
510 checkState();
511 open = false;
512 returnTable(table);
513 }
514
515 @Override
516 public CoprocessorRpcChannel coprocessorService(byte[] row) {
517 checkState();
518 return table.coprocessorService(row);
519 }
520
521 @Override
522 public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
523 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
524 throws ServiceException, Throwable {
525 checkState();
526 return table.coprocessorService(service, startKey, endKey, callable);
527 }
528
529 @Override
530 public <T extends Service, R> void coprocessorService(Class<T> service,
531 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
532 throws ServiceException, Throwable {
533 checkState();
534 table.coprocessorService(service, startKey, endKey, callable, callback);
535 }
536
537 @Override
538 public String toString() {
539 return "PooledHTable{" + ", table=" + table + '}';
540 }
541
542
543
544
545
546
547 HTableInterface getWrappedTable() {
548 return table;
549 }
550
551 @Override
552 public <R> void batchCallback(List<? extends Row> actions,
553 Object[] results, Callback<R> callback) throws IOException,
554 InterruptedException {
555 checkState();
556 table.batchCallback(actions, results, callback);
557 }
558
559
560
561
562
563
564
565
566 @Override
567 public <R> Object[] batchCallback(List<? extends Row> actions,
568 Callback<R> callback) throws IOException, InterruptedException {
569 checkState();
570 return table.batchCallback(actions, callback);
571 }
572
573 @Override
574 public void mutateRow(RowMutations rm) throws IOException {
575 checkState();
576 table.mutateRow(rm);
577 }
578
579 @Override
580 public Result append(Append append) throws IOException {
581 checkState();
582 return table.append(append);
583 }
584
585 @Override
586 public void setAutoFlush(boolean autoFlush) {
587 checkState();
588 table.setAutoFlush(autoFlush, autoFlush);
589 }
590
591 @Override
592 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
593 checkState();
594 table.setAutoFlush(autoFlush, clearBufferOnFail);
595 }
596
597 @Override
598 public void setAutoFlushTo(boolean autoFlush) {
599 table.setAutoFlushTo(autoFlush);
600 }
601
602 @Override
603 public long getWriteBufferSize() {
604 checkState();
605 return table.getWriteBufferSize();
606 }
607
608 @Override
609 public void setWriteBufferSize(long writeBufferSize) throws IOException {
610 checkState();
611 table.setWriteBufferSize(writeBufferSize);
612 }
613
614 boolean isOpen() {
615 return open;
616 }
617
618 private void checkState() {
619 if (!isOpen()) {
620 throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
621 }
622 }
623
624 @Override
625 public long incrementColumnValue(byte[] row, byte[] family,
626 byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
627 return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
628 }
629
630 @Override
631 public <R extends Message> Map<byte[], R> batchCoprocessorService(
632 Descriptors.MethodDescriptor method, Message request,
633 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
634 checkState();
635 return table.batchCoprocessorService(method, request, startKey, endKey,
636 responsePrototype);
637 }
638
639 @Override
640 public <R extends Message> void batchCoprocessorService(
641 Descriptors.MethodDescriptor method, Message request,
642 byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
643 throws ServiceException, Throwable {
644 checkState();
645 table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
646 }
647
648 @Override
649 public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
650 byte[] value, RowMutations mutation) throws IOException {
651 checkState();
652 return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
653 }
654 }
655 }