1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.NavigableMap;
30 import java.util.TreeMap;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.SynchronousQueue;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38
39 import com.google.protobuf.InvalidProtocolBufferException;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.hbase.classification.InterfaceAudience;
44 import org.apache.hadoop.hbase.classification.InterfaceStability;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.hbase.Cell;
47 import org.apache.hadoop.hbase.DoNotRetryIOException;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.HRegionInfo;
51 import org.apache.hadoop.hbase.HRegionLocation;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.KeyValue;
54 import org.apache.hadoop.hbase.KeyValueUtil;
55 import org.apache.hadoop.hbase.ServerName;
56 import org.apache.hadoop.hbase.TableName;
57 import org.apache.hadoop.hbase.client.coprocessor.Batch;
58 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
59 import org.apache.hadoop.hbase.filter.BinaryComparator;
60 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
61 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
62 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
63 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
64 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
65 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66 import org.apache.hadoop.hbase.protobuf.RequestConverter;
67 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
68 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
69 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
70 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
71 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
72 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
73 import org.apache.hadoop.hbase.util.Bytes;
74 import org.apache.hadoop.hbase.util.Pair;
75 import org.apache.hadoop.hbase.util.Threads;
76
77 import com.google.protobuf.Descriptors;
78 import com.google.protobuf.Message;
79 import com.google.protobuf.Service;
80 import com.google.protobuf.ServiceException;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 @InterfaceAudience.Public
124 @InterfaceStability.Stable
125 public class HTable implements HTableInterface {
126 private static final Log LOG = LogFactory.getLog(HTable.class);
127 protected HConnection connection;
128 private final TableName tableName;
129 private volatile Configuration configuration;
130 private TableConfiguration tableConfiguration;
131 protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
132 private long writeBufferSize;
133 private boolean clearBufferOnFail;
134 private boolean autoFlush;
135 protected long currentWriteBufferSize;
136 protected int scannerCaching;
137 private ExecutorService pool;
138 private boolean closed;
139 private int operationTimeout;
140 private final boolean cleanupPoolOnClose;
141 private final boolean cleanupConnectionOnClose;
142
143
144 protected AsyncProcess<Object> ap;
145 private RpcRetryingCallerFactory rpcCallerFactory;
146 private RpcControllerFactory rpcControllerFactory;
147
148
149
150
151
152
153
154
155
156
157
158 public HTable(Configuration conf, final String tableName)
159 throws IOException {
160 this(conf, TableName.valueOf(tableName));
161 }
162
163
164
165
166
167
168
169
170
171
172
173 public HTable(Configuration conf, final byte[] tableName)
174 throws IOException {
175 this(conf, TableName.valueOf(tableName));
176 }
177
178
179
180
181
182
183
184
185
186
187
188
189
190 public HTable(Configuration conf, final TableName tableName)
191 throws IOException {
192 this.tableName = tableName;
193 this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
194 if (conf == null) {
195 this.connection = null;
196 return;
197 }
198 this.connection = HConnectionManager.getConnection(conf);
199 this.configuration = conf;
200
201 this.pool = getDefaultExecutor(conf);
202 this.finishSetup();
203 }
204
205
206
207
208
209
210
211
212
213 public HTable(TableName tableName, HConnection connection) throws IOException {
214 this.tableName = tableName;
215 this.cleanupPoolOnClose = true;
216 this.cleanupConnectionOnClose = false;
217 this.connection = connection;
218 this.configuration = connection.getConfiguration();
219
220 this.pool = getDefaultExecutor(this.configuration);
221 this.finishSetup();
222 }
223
224 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
225 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
226 if (maxThreads == 0) {
227 maxThreads = 1;
228 }
229 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
230
231
232
233
234
235 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
236 new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
237 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
238 return pool;
239 }
240
241
242
243
244
245
246
247
248
249
250
251
252
253 public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
254 throws IOException {
255 this(conf, TableName.valueOf(tableName), pool);
256 }
257
258
259
260
261
262
263
264
265
266
267
268
269
270 public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
271 throws IOException {
272 this.connection = HConnectionManager.getConnection(conf);
273 this.configuration = conf;
274 this.pool = pool;
275 this.tableName = tableName;
276 this.cleanupPoolOnClose = false;
277 this.cleanupConnectionOnClose = true;
278
279 this.finishSetup();
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293 public HTable(final byte[] tableName, final HConnection connection,
294 final ExecutorService pool) throws IOException {
295 this(TableName.valueOf(tableName), connection, pool);
296 }
297
298
299
300
301
302
303
304
305
306
307
308
309 public HTable(TableName tableName, final HConnection connection,
310 final ExecutorService pool) throws IOException {
311 this(tableName, connection, null, null, null, pool);
312 }
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 public HTable(TableName tableName, final HConnection connection,
329 final TableConfiguration tableConfig,
330 final RpcRetryingCallerFactory rpcCallerFactory,
331 final RpcControllerFactory rpcControllerFactory,
332 final ExecutorService pool) throws IOException {
333 if (connection == null || connection.isClosed()) {
334 throw new IllegalArgumentException("Connection is null or closed.");
335 }
336 this.tableName = tableName;
337 this.connection = connection;
338 this.configuration = connection.getConfiguration();
339 this.tableConfiguration = tableConfig;
340 this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
341 this.pool = pool;
342
343 this.rpcCallerFactory = rpcCallerFactory;
344 this.rpcControllerFactory = rpcControllerFactory;
345
346 this.finishSetup();
347 }
348
349
350
351
352 protected HTable(){
353 tableName = null;
354 tableConfiguration = new TableConfiguration();
355 cleanupPoolOnClose = false;
356 cleanupConnectionOnClose = false;
357 }
358
359
360
361
362 public static int getMaxKeyValueSize(Configuration conf) {
363 return conf.getInt("hbase.client.keyvalue.maxsize", -1);
364 }
365
366
367
368
369 private void finishSetup() throws IOException {
370 if (tableConfiguration == null) {
371 tableConfiguration = new TableConfiguration(configuration);
372 }
373 this.operationTimeout = tableName.isSystemTable() ?
374 tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
375 this.writeBufferSize = tableConfiguration.getWriteBufferSize();
376 this.clearBufferOnFail = true;
377 this.autoFlush = true;
378 this.currentWriteBufferSize = 0;
379 this.scannerCaching = tableConfiguration.getScannerCaching();
380
381 if (this.rpcCallerFactory == null) {
382 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration,
383 this.connection.getStatisticsTracker());
384 }
385 if (this.rpcControllerFactory == null) {
386 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
387 }
388
389 ap = new AsyncProcess<Object>(connection, tableName, pool, null, configuration,
390 rpcCallerFactory, rpcControllerFactory);
391
392 this.closed = false;
393 }
394
395
396
397
398 @Override
399 public Configuration getConfiguration() {
400 return configuration;
401 }
402
403
404
405
406
407
408
409
410
411
412 @Deprecated
413 public static boolean isTableEnabled(String tableName) throws IOException {
414 return isTableEnabled(TableName.valueOf(tableName));
415 }
416
417
418
419
420
421
422
423
424
425
426 @Deprecated
427 public static boolean isTableEnabled(byte[] tableName) throws IOException {
428 return isTableEnabled(TableName.valueOf(tableName));
429 }
430
431
432
433
434
435
436
437
438
439
440 @Deprecated
441 public static boolean isTableEnabled(TableName tableName) throws IOException {
442 return isTableEnabled(HBaseConfiguration.create(), tableName);
443 }
444
445
446
447
448
449
450
451
452
453 @Deprecated
454 public static boolean isTableEnabled(Configuration conf, String tableName)
455 throws IOException {
456 return isTableEnabled(conf, TableName.valueOf(tableName));
457 }
458
459
460
461
462
463
464
465
466
467 @Deprecated
468 public static boolean isTableEnabled(Configuration conf, byte[] tableName)
469 throws IOException {
470 return isTableEnabled(conf, TableName.valueOf(tableName));
471 }
472
473
474
475
476
477
478
479
480
481 @Deprecated
482 public static boolean isTableEnabled(Configuration conf,
483 final TableName tableName) throws IOException {
484 return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
485 @Override
486 public Boolean connect(HConnection connection) throws IOException {
487 return connection.isTableEnabled(tableName);
488 }
489 });
490 }
491
492
493
494
495
496
497
498 public HRegionLocation getRegionLocation(final String row)
499 throws IOException {
500 return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
501 }
502
503
504
505
506
507
508
509 public HRegionLocation getRegionLocation(final byte [] row)
510 throws IOException {
511 return connection.getRegionLocation(tableName, row, false);
512 }
513
514
515
516
517
518
519
520
521 public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
522 throws IOException {
523 return connection.getRegionLocation(tableName, row, reload);
524 }
525
526
527
528
529 @Override
530 public byte [] getTableName() {
531 return this.tableName.getName();
532 }
533
534 @Override
535 public TableName getName() {
536 return tableName;
537 }
538
539
540
541
542
543
544
545
546 @Deprecated
547 public HConnection getConnection() {
548 return this.connection;
549 }
550
551
552
553
554
555
556
557 @Deprecated
558 public int getScannerCaching() {
559 return scannerCaching;
560 }
561
562
563
564
565
566 @Deprecated
567 public List<Row> getWriteBuffer() {
568 return writeAsyncBuffer;
569 }
570
571
572
573
574
575
576
577
578
579
580
581
582 @Deprecated
583 public void setScannerCaching(int scannerCaching) {
584 this.scannerCaching = scannerCaching;
585 }
586
587
588
589
590 @Override
591 public HTableDescriptor getTableDescriptor() throws IOException {
592 return new UnmodifyableHTableDescriptor(
593 this.connection.getHTableDescriptor(this.tableName));
594 }
595
596
597
598
599
600
601
602
603 public byte [][] getStartKeys() throws IOException {
604 return getStartEndKeys().getFirst();
605 }
606
607
608
609
610
611
612
613
614 public byte[][] getEndKeys() throws IOException {
615 return getStartEndKeys().getSecond();
616 }
617
618
619
620
621
622
623
624
625
626 public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
627 NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
628 final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
629 final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
630
631 for (HRegionInfo region : regions.keySet()) {
632 startKeyList.add(region.getStartKey());
633 endKeyList.add(region.getEndKey());
634 }
635
636 return new Pair<byte [][], byte [][]>(
637 startKeyList.toArray(new byte[startKeyList.size()][]),
638 endKeyList.toArray(new byte[endKeyList.size()][]));
639 }
640
641
642
643
644
645
646
647
648 public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
649
650 return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName(), false);
651 }
652
653
654
655
656
657
658
659
660
661
662 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
663 final byte [] endKey) throws IOException {
664 return getRegionsInRange(startKey, endKey, false);
665 }
666
667
668
669
670
671
672
673
674
675
676
677 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
678 final byte [] endKey, final boolean reload) throws IOException {
679 return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
680 }
681
682
683
684
685
686
687
688
689
690
691
692
693 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
694 final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
695 throws IOException {
696 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
697 }
698
699
700
701
702
703
704
705
706
707
708
709
710
711 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
712 final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
713 final boolean reload) throws IOException {
714 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
715 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
716 throw new IllegalArgumentException(
717 "Invalid range: " + Bytes.toStringBinary(startKey) +
718 " > " + Bytes.toStringBinary(endKey));
719 }
720 List<byte[]> keysInRange = new ArrayList<byte[]>();
721 List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
722 byte[] currentKey = startKey;
723 do {
724 HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
725 keysInRange.add(currentKey);
726 regionsInRange.add(regionLocation);
727 currentKey = regionLocation.getRegionInfo().getEndKey();
728 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
729 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
730 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
731 return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
732 regionsInRange);
733 }
734
735
736
737
738 @Override
739 public Result getRowOrBefore(final byte[] row, final byte[] family)
740 throws IOException {
741 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
742 tableName, row) {
743 public Result call() throws IOException {
744 return ProtobufUtil.getRowOrBefore(getStub(), getLocation().getRegionInfo()
745 .getRegionName(), row, family, rpcControllerFactory.newController());
746 }
747 };
748 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
749 }
750
751
752
753
754 @Override
755 public ResultScanner getScanner(final Scan scan) throws IOException {
756 if (scan.getBatch() > 0 && scan.isSmall()) {
757 throw new IllegalArgumentException("Small scan should not be used with batching");
758 }
759 if (scan.getCaching() <= 0) {
760 scan.setCaching(getScannerCaching());
761 }
762
763 if (scan.isReversed()) {
764 if (scan.isSmall()) {
765 return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
766 this.connection);
767 } else {
768 return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection);
769 }
770 }
771
772 if (scan.isSmall()) {
773 return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection);
774 } else {
775 return new ClientScanner(getConfiguration(), scan, getName(), this.connection);
776 }
777 }
778
779
780
781
782 @Override
783 public ResultScanner getScanner(byte [] family) throws IOException {
784 Scan scan = new Scan();
785 scan.addFamily(family);
786 return getScanner(scan);
787 }
788
789
790
791
792 @Override
793 public ResultScanner getScanner(byte [] family, byte [] qualifier)
794 throws IOException {
795 Scan scan = new Scan();
796 scan.addColumn(family, qualifier);
797 return getScanner(scan);
798 }
799
800
801
802
803 @Override
804 public Result get(final Get get) throws IOException {
805
806
807
808 final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
809 controller.setPriority(tableName);
810 RegionServerCallable<Result> callable =
811 new RegionServerCallable<Result>(this.connection, getName(), get.getRow()) {
812 public Result call() throws IOException {
813 return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get,
814 controller);
815 }
816 };
817 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
818 }
819
820
821
822
823 @Override
824 public Result[] get(List<Get> gets) throws IOException {
825 if (gets.size() == 1) {
826 return new Result[]{get(gets.get(0))};
827 }
828 try {
829 Object [] r1 = batch((List)gets);
830
831
832 Result [] results = new Result[r1.length];
833 int i=0;
834 for (Object o : r1) {
835
836 results[i++] = (Result) o;
837 }
838
839 return results;
840 } catch (InterruptedException e) {
841 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
842 }
843 }
844
845
846
847
848 @Override
849 public void batch(final List<?extends Row> actions, final Object[] results)
850 throws InterruptedException, IOException {
851 batchCallback(actions, results, null);
852 }
853
854
855
856
857
858
859 @Override
860 public Object[] batch(final List<? extends Row> actions)
861 throws InterruptedException, IOException {
862 return batchCallback(actions, null);
863 }
864
865
866
867
868 @Override
869 public <R> void batchCallback(
870 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
871 throws IOException, InterruptedException {
872 connection.processBatchCallback(actions, tableName, pool, results, callback);
873 }
874
875
876
877
878
879
880
881
882 @Override
883 public <R> Object[] batchCallback(
884 final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
885 InterruptedException {
886 Object[] results = new Object[actions.size()];
887 batchCallback(actions, results, callback);
888 return results;
889 }
890
891
892
893
894 @Override
895 public void delete(final Delete delete)
896 throws IOException {
897 RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
898 tableName, delete.getRow()) {
899 public Boolean call() throws IOException {
900 try {
901 MutateRequest request = RequestConverter.buildMutateRequest(
902 getLocation().getRegionInfo().getRegionName(), delete);
903 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
904 controller.setPriority(tableName);
905 MutateResponse response = getStub().mutate(controller, request);
906 return Boolean.valueOf(response.getProcessed());
907 } catch (ServiceException se) {
908 throw ProtobufUtil.getRemoteException(se);
909 }
910 }
911 };
912 rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
913 }
914
915
916
917
918 @Override
919 public void delete(final List<Delete> deletes)
920 throws IOException {
921 Object[] results = new Object[deletes.size()];
922 try {
923 batch(deletes, results);
924 } catch (InterruptedException e) {
925 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
926 } finally {
927
928
929
930 for (int i = results.length - 1; i>=0; i--) {
931
932 if (results[i] instanceof Result) {
933 deletes.remove(i);
934 }
935 }
936 }
937 }
938
939
940
941
942 @Override
943 public void put(final Put put)
944 throws InterruptedIOException, RetriesExhaustedWithDetailsException {
945 doPut(put);
946 if (autoFlush) {
947 flushCommits();
948 }
949 }
950
951
952
953
954 @Override
955 public void put(final List<Put> puts)
956 throws InterruptedIOException, RetriesExhaustedWithDetailsException {
957 for (Put put : puts) {
958 doPut(put);
959 }
960 if (autoFlush) {
961 flushCommits();
962 }
963 }
964
965
966
967
968
969
970
971
972 private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
973 if (ap.hasError()){
974 writeAsyncBuffer.add(put);
975 backgroundFlushCommits(true);
976 }
977
978 validatePut(put);
979
980 currentWriteBufferSize += put.heapSize();
981 writeAsyncBuffer.add(put);
982
983 while (currentWriteBufferSize > writeBufferSize) {
984 backgroundFlushCommits(false);
985 }
986 }
987
988
989
990
991
992
993
994
995
996 private void backgroundFlushCommits(boolean synchronous) throws
997 InterruptedIOException, RetriesExhaustedWithDetailsException {
998
999 try {
1000 do {
1001 ap.submit(writeAsyncBuffer, true);
1002 } while (synchronous && !writeAsyncBuffer.isEmpty());
1003
1004 if (synchronous) {
1005 ap.waitUntilDone();
1006 }
1007
1008 if (ap.hasError()) {
1009 LOG.debug(tableName + ": One or more of the operations have failed -" +
1010 " waiting for all operation in progress to finish (successfully or not)");
1011 while (!writeAsyncBuffer.isEmpty()) {
1012 ap.submit(writeAsyncBuffer, true);
1013 }
1014 ap.waitUntilDone();
1015
1016 if (!clearBufferOnFail) {
1017
1018
1019 writeAsyncBuffer.addAll(ap.getFailedOperations());
1020 }
1021 RetriesExhaustedWithDetailsException e = ap.getErrors();
1022 ap.clearErrors();
1023 throw e;
1024 }
1025 } finally {
1026 currentWriteBufferSize = 0;
1027 for (Row mut : writeAsyncBuffer) {
1028 if (mut instanceof Mutation) {
1029 currentWriteBufferSize += ((Mutation) mut).heapSize();
1030 }
1031 }
1032 }
1033 }
1034
1035
1036
1037
1038 @Override
1039 public void mutateRow(final RowMutations rm) throws IOException {
1040 RegionServerCallable<Void> callable =
1041 new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1042 public Void call() throws IOException {
1043 try {
1044 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1045 getLocation().getRegionInfo().getRegionName(), rm);
1046 regionMutationBuilder.setAtomic(true);
1047 MultiRequest request =
1048 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1049 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1050 controller.setPriority(tableName);
1051 ClientProtos.MultiResponse response = getStub().multi(controller, request);
1052 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1053 if (res.hasException()) {
1054 Throwable ex = ProtobufUtil.toException(res.getException());
1055 if(ex instanceof IOException) {
1056 throw (IOException)ex;
1057 }
1058 throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1059 }
1060 } catch (ServiceException se) {
1061 throw ProtobufUtil.getRemoteException(se);
1062 }
1063 return null;
1064 }
1065 };
1066 rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1067 }
1068
1069
1070
1071
1072 @Override
1073 public Result append(final Append append) throws IOException {
1074 if (append.numFamilies() == 0) {
1075 throw new IOException(
1076 "Invalid arguments to append, no columns specified");
1077 }
1078
1079 NonceGenerator ng = this.connection.getNonceGenerator();
1080 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1081 RegionServerCallable<Result> callable =
1082 new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1083 public Result call() throws IOException {
1084 try {
1085 MutateRequest request = RequestConverter.buildMutateRequest(
1086 getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1087 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1088 rpcController.setPriority(getTableName());
1089 MutateResponse response = getStub().mutate(rpcController, request);
1090 if (!response.hasResult()) return null;
1091 return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1092 } catch (ServiceException se) {
1093 throw ProtobufUtil.getRemoteException(se);
1094 }
1095 }
1096 };
1097 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1098 }
1099
1100
1101
1102
1103 @Override
1104 public Result increment(final Increment increment) throws IOException {
1105 if (!increment.hasFamilies()) {
1106 throw new IOException(
1107 "Invalid arguments to increment, no columns specified");
1108 }
1109 NonceGenerator ng = this.connection.getNonceGenerator();
1110 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1111 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1112 getName(), increment.getRow()) {
1113 public Result call() throws IOException {
1114 try {
1115 MutateRequest request = RequestConverter.buildMutateRequest(
1116 getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1117 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1118 rpcController.setPriority(getTableName());
1119 MutateResponse response = getStub().mutate(rpcController, request);
1120 return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1121 } catch (ServiceException se) {
1122 throw ProtobufUtil.getRemoteException(se);
1123 }
1124 }
1125 };
1126 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1127 }
1128
1129
1130
1131
1132 @Override
1133 public long incrementColumnValue(final byte [] row, final byte [] family,
1134 final byte [] qualifier, final long amount)
1135 throws IOException {
1136 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1137 }
1138
1139
1140
1141
1142 @Deprecated
1143 @Override
1144 public long incrementColumnValue(final byte [] row, final byte [] family,
1145 final byte [] qualifier, final long amount, final boolean writeToWAL)
1146 throws IOException {
1147 return incrementColumnValue(row, family, qualifier, amount,
1148 writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1149 }
1150
1151
1152
1153
1154 @Override
1155 public long incrementColumnValue(final byte [] row, final byte [] family,
1156 final byte [] qualifier, final long amount, final Durability durability)
1157 throws IOException {
1158 NullPointerException npe = null;
1159 if (row == null) {
1160 npe = new NullPointerException("row is null");
1161 } else if (family == null) {
1162 npe = new NullPointerException("family is null");
1163 } else if (qualifier == null) {
1164 npe = new NullPointerException("qualifier is null");
1165 }
1166 if (npe != null) {
1167 throw new IOException(
1168 "Invalid arguments to incrementColumnValue", npe);
1169 }
1170
1171 NonceGenerator ng = this.connection.getNonceGenerator();
1172 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1173 RegionServerCallable<Long> callable =
1174 new RegionServerCallable<Long>(connection, getName(), row) {
1175 public Long call() throws IOException {
1176 try {
1177 MutateRequest request = RequestConverter.buildIncrementRequest(
1178 getLocation().getRegionInfo().getRegionName(), row, family,
1179 qualifier, amount, durability, nonceGroup, nonce);
1180 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1181 rpcController.setPriority(getTableName());
1182 MutateResponse response = getStub().mutate(rpcController, request);
1183 Result result =
1184 ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1185 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1186 } catch (ServiceException se) {
1187 throw ProtobufUtil.getRemoteException(se);
1188 }
1189 }
1190 };
1191 return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1192 }
1193
1194
1195
1196
1197 @Override
1198 public boolean checkAndPut(final byte [] row,
1199 final byte [] family, final byte [] qualifier, final byte [] value,
1200 final Put put)
1201 throws IOException {
1202 RegionServerCallable<Boolean> callable =
1203 new RegionServerCallable<Boolean>(connection, getName(), row) {
1204 public Boolean call() throws IOException {
1205 try {
1206 MutateRequest request = RequestConverter.buildMutateRequest(
1207 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1208 new BinaryComparator(value), CompareType.EQUAL, put);
1209 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1210 rpcController.setPriority(getTableName());
1211 MutateResponse response = getStub().mutate(rpcController, request);
1212 return Boolean.valueOf(response.getProcessed());
1213 } catch (ServiceException se) {
1214 throw ProtobufUtil.getRemoteException(se);
1215 }
1216 }
1217 };
1218 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1219 }
1220
1221
1222
1223
1224
1225 @Override
1226 public boolean checkAndDelete(final byte [] row,
1227 final byte [] family, final byte [] qualifier, final byte [] value,
1228 final Delete delete)
1229 throws IOException {
1230 RegionServerCallable<Boolean> callable =
1231 new RegionServerCallable<Boolean>(connection, getName(), row) {
1232 public Boolean call() throws IOException {
1233 try {
1234 MutateRequest request = RequestConverter.buildMutateRequest(
1235 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1236 new BinaryComparator(value), CompareType.EQUAL, delete);
1237 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1238 rpcController.setPriority(getTableName());
1239 MutateResponse response = getStub().mutate(rpcController, request);
1240 return Boolean.valueOf(response.getProcessed());
1241 } catch (ServiceException se) {
1242 throw ProtobufUtil.getRemoteException(se);
1243 }
1244 }
1245 };
1246 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1247 }
1248
1249
1250
1251
1252 @Override
1253 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1254 final CompareOp compareOp, final byte [] value, final RowMutations rm)
1255 throws IOException {
1256 RegionServerCallable<Boolean> callable =
1257 new RegionServerCallable<Boolean>(connection, getName(), row) {
1258 @Override
1259 public Boolean call() throws IOException {
1260 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1261 controller.setPriority(tableName);
1262 try {
1263 CompareType compareType = CompareType.valueOf(compareOp.name());
1264 MultiRequest request = RequestConverter.buildMutateRequest(
1265 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1266 new BinaryComparator(value), compareType, rm);
1267 ClientProtos.MultiResponse response = getStub().multi(controller, request);
1268 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1269 if (res.hasException()) {
1270 Throwable ex = ProtobufUtil.toException(res.getException());
1271 if(ex instanceof IOException) {
1272 throw (IOException)ex;
1273 }
1274 throw new IOException("Failed to checkAndMutate row: "+
1275 Bytes.toStringBinary(rm.getRow()), ex);
1276 }
1277 return Boolean.valueOf(response.getProcessed());
1278 } catch (ServiceException se) {
1279 throw ProtobufUtil.getRemoteException(se);
1280 }
1281 }
1282 };
1283 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1284 }
1285
1286
1287
1288
1289 @Override
1290 public boolean exists(final Get get) throws IOException {
1291 get.setCheckExistenceOnly(true);
1292 Result r = get(get);
1293 assert r.getExists() != null;
1294 return r.getExists();
1295 }
1296
1297
1298
1299
1300 @Override
1301 public Boolean[] exists(final List<Get> gets) throws IOException {
1302 if (gets.isEmpty()) return new Boolean[]{};
1303 if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
1304
1305 for (Get g: gets){
1306 g.setCheckExistenceOnly(true);
1307 }
1308
1309 Object[] r1;
1310 try {
1311 r1 = batch(gets);
1312 } catch (InterruptedException e) {
1313 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1314 }
1315
1316
1317 Boolean[] results = new Boolean[r1.length];
1318 int i = 0;
1319 for (Object o : r1) {
1320
1321 results[i++] = ((Result)o).getExists();
1322 }
1323
1324 return results;
1325 }
1326
1327
1328
1329
1330 @Override
1331 public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1332
1333
1334 backgroundFlushCommits(true);
1335 }
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348 public <R> void processBatchCallback(
1349 final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1350 throws IOException, InterruptedException {
1351 this.batchCallback(list, results, callback);
1352 }
1353
1354
1355
1356
1357
1358
1359 public void processBatch(final List<? extends Row> list, final Object[] results)
1360 throws IOException, InterruptedException {
1361
1362 this.processBatchCallback(list, results, null);
1363 }
1364
1365
1366 @Override
1367 public void close() throws IOException {
1368 if (this.closed) {
1369 return;
1370 }
1371 flushCommits();
1372 if (cleanupPoolOnClose) {
1373 this.pool.shutdown();
1374 }
1375 if (cleanupConnectionOnClose) {
1376 if (this.connection != null) {
1377 this.connection.close();
1378 }
1379 }
1380 this.closed = true;
1381 }
1382
1383
1384 public void validatePut(final Put put) throws IllegalArgumentException {
1385 validatePut(put, tableConfiguration.getMaxKeyValueSize());
1386 }
1387
1388
1389 public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1390 if (put.isEmpty()) {
1391 throw new IllegalArgumentException("No columns to insert");
1392 }
1393 if (maxKeyValueSize > 0) {
1394 for (List<Cell> list : put.getFamilyCellMap().values()) {
1395 for (Cell cell : list) {
1396
1397 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1398 if (kv.getLength() > maxKeyValueSize) {
1399 throw new IllegalArgumentException("KeyValue size too large");
1400 }
1401 }
1402 }
1403 }
1404 }
1405
1406
1407
1408
1409 @Override
1410 public boolean isAutoFlush() {
1411 return autoFlush;
1412 }
1413
1414
1415
1416
1417 @Deprecated
1418 @Override
1419 public void setAutoFlush(boolean autoFlush) {
1420 setAutoFlush(autoFlush, autoFlush);
1421 }
1422
1423
1424
1425
1426 @Override
1427 public void setAutoFlushTo(boolean autoFlush) {
1428 setAutoFlush(autoFlush, clearBufferOnFail);
1429 }
1430
1431
1432
1433
1434 @Override
1435 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1436 this.autoFlush = autoFlush;
1437 this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1438 }
1439
1440
1441
1442
1443
1444
1445
1446
1447 @Override
1448 public long getWriteBufferSize() {
1449 return writeBufferSize;
1450 }
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460 public void setWriteBufferSize(long writeBufferSize) throws IOException {
1461 this.writeBufferSize = writeBufferSize;
1462 if(currentWriteBufferSize > writeBufferSize) {
1463 flushCommits();
1464 }
1465 }
1466
1467
1468
1469
1470
1471 ExecutorService getPool() {
1472 return this.pool;
1473 }
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484 public static void setRegionCachePrefetch(final byte[] tableName,
1485 final boolean enable) throws IOException {
1486 setRegionCachePrefetch(TableName.valueOf(tableName), enable);
1487 }
1488
1489 public static void setRegionCachePrefetch(
1490 final TableName tableName,
1491 final boolean enable) throws IOException {
1492 HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration.create()) {
1493 @Override
1494 public Void connect(HConnection connection) throws IOException {
1495 connection.setRegionCachePrefetch(tableName, enable);
1496 return null;
1497 }
1498 });
1499 }
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511 public static void setRegionCachePrefetch(final Configuration conf,
1512 final byte[] tableName, final boolean enable) throws IOException {
1513 setRegionCachePrefetch(conf, TableName.valueOf(tableName), enable);
1514 }
1515
1516 public static void setRegionCachePrefetch(final Configuration conf,
1517 final TableName tableName,
1518 final boolean enable) throws IOException {
1519 HConnectionManager.execute(new HConnectable<Void>(conf) {
1520 @Override
1521 public Void connect(HConnection connection) throws IOException {
1522 connection.setRegionCachePrefetch(tableName, enable);
1523 return null;
1524 }
1525 });
1526 }
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536 public static boolean getRegionCachePrefetch(final Configuration conf,
1537 final byte[] tableName) throws IOException {
1538 return getRegionCachePrefetch(conf, TableName.valueOf(tableName));
1539 }
1540
1541 public static boolean getRegionCachePrefetch(final Configuration conf,
1542 final TableName tableName) throws IOException {
1543 return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1544 @Override
1545 public Boolean connect(HConnection connection) throws IOException {
1546 return connection.getRegionCachePrefetch(tableName);
1547 }
1548 });
1549 }
1550
1551
1552
1553
1554
1555
1556
1557
1558 public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1559 return getRegionCachePrefetch(TableName.valueOf(tableName));
1560 }
1561
1562 public static boolean getRegionCachePrefetch(
1563 final TableName tableName) throws IOException {
1564 return HConnectionManager.execute(new HConnectable<Boolean>(
1565 HBaseConfiguration.create()) {
1566 @Override
1567 public Boolean connect(HConnection connection) throws IOException {
1568 return connection.getRegionCachePrefetch(tableName);
1569 }
1570 });
1571 }
1572
1573
1574
1575
1576
1577 public void clearRegionCache() {
1578 this.connection.clearRegionCache();
1579 }
1580
1581
1582
1583
1584 public CoprocessorRpcChannel coprocessorService(byte[] row) {
1585 return new RegionCoprocessorRpcChannel(connection, tableName, row, rpcCallerFactory,
1586 rpcControllerFactory);
1587 }
1588
1589
1590
1591
1592 @Override
1593 public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1594 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1595 throws ServiceException, Throwable {
1596 final Map<byte[],R> results = Collections.synchronizedMap(
1597 new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1598 coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1599 public void update(byte[] region, byte[] row, R value) {
1600 if (region != null) {
1601 results.put(region, value);
1602 }
1603 }
1604 });
1605 return results;
1606 }
1607
1608
1609
1610
1611 @Override
1612 public <T extends Service, R> void coprocessorService(final Class<T> service,
1613 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1614 final Batch.Callback<R> callback) throws ServiceException, Throwable {
1615
1616
1617 List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1618
1619 Map<byte[],Future<R>> futures =
1620 new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1621 for (final byte[] r : keys) {
1622 final RegionCoprocessorRpcChannel channel =
1623 new RegionCoprocessorRpcChannel(connection, tableName, r, rpcCallerFactory,
1624 rpcControllerFactory);
1625 Future<R> future = pool.submit(
1626 new Callable<R>() {
1627 public R call() throws Exception {
1628 T instance = ProtobufUtil.newServiceStub(service, channel);
1629 R result = callable.call(instance);
1630 byte[] region = channel.getLastRegion();
1631 if (callback != null) {
1632 callback.update(region, r, result);
1633 }
1634 return result;
1635 }
1636 });
1637 futures.put(r, future);
1638 }
1639 for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1640 try {
1641 e.getValue().get();
1642 } catch (ExecutionException ee) {
1643 LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1644 + Bytes.toStringBinary(e.getKey()), ee);
1645 throw ee.getCause();
1646 } catch (InterruptedException ie) {
1647 throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1648 + " for row " + Bytes.toStringBinary(e.getKey()))
1649 .initCause(ie);
1650 }
1651 }
1652 }
1653
1654 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1655 throws IOException {
1656 if (start == null) {
1657 start = HConstants.EMPTY_START_ROW;
1658 }
1659 if (end == null) {
1660 end = HConstants.EMPTY_END_ROW;
1661 }
1662 return getKeysAndRegionsInRange(start, end, true).getFirst();
1663 }
1664
1665 public void setOperationTimeout(int operationTimeout) {
1666 this.operationTimeout = operationTimeout;
1667 }
1668
1669 public int getOperationTimeout() {
1670 return operationTimeout;
1671 }
1672
1673 @Override
1674 public String toString() {
1675 return tableName + ";" + connection;
1676 }
1677
1678
1679
1680
1681
1682
1683 public static void main(String[] args) throws IOException {
1684 HTable t = new HTable(HBaseConfiguration.create(), args[0]);
1685 try {
1686 System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1687 } finally {
1688 t.close();
1689 }
1690 }
1691
1692
1693
1694
1695 @Override
1696 public <R extends Message> Map<byte[], R> batchCoprocessorService(
1697 Descriptors.MethodDescriptor methodDescriptor, Message request,
1698 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1699 final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1700 Bytes.BYTES_COMPARATOR));
1701 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1702 new Callback<R>() {
1703
1704 @Override
1705 public void update(byte[] region, byte[] row, R result) {
1706 if (region != null) {
1707 results.put(region, result);
1708 }
1709 }
1710 });
1711 return results;
1712 }
1713
1714
1715
1716
1717 @Override
1718 public <R extends Message> void batchCoprocessorService(
1719 final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1720 byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1721 throws ServiceException, Throwable {
1722
1723
1724 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1725 getKeysAndRegionsInRange(startKey, endKey, true);
1726 List<byte[]> keys = keysAndRegions.getFirst();
1727 List<HRegionLocation> regions = keysAndRegions.getSecond();
1728
1729
1730 if (keys.isEmpty()) {
1731 LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1732 ", end=" + Bytes.toStringBinary(endKey));
1733 return;
1734 }
1735
1736 List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1737 final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1738 new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1739 for (int i = 0; i < keys.size(); i++) {
1740 final byte[] rowKey = keys.get(i);
1741 final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1742 RegionCoprocessorServiceExec exec =
1743 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1744 execs.add(exec);
1745 execsByRow.put(rowKey, exec);
1746 }
1747
1748
1749
1750 final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1751 final List<Row> callbackErrorActions = new ArrayList<Row>();
1752 final List<String> callbackErrorServers = new ArrayList<String>();
1753
1754 AsyncProcess<ClientProtos.CoprocessorServiceResult> asyncProcess =
1755 new AsyncProcess<ClientProtos.CoprocessorServiceResult>(connection, tableName, pool,
1756 new AsyncProcess.AsyncProcessCallback<ClientProtos.CoprocessorServiceResult>() {
1757 @SuppressWarnings("unchecked")
1758 @Override
1759 public void success(int originalIndex, byte[] region, Row row,
1760 ClientProtos.CoprocessorServiceResult serviceResult) {
1761 if (LOG.isTraceEnabled()) {
1762 LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1763 " call #" + originalIndex + ": region=" + Bytes.toStringBinary(region) +
1764 ", row=" + Bytes.toStringBinary(row.getRow()) +
1765 ", value=" + serviceResult.getValue().getValue());
1766 }
1767 try {
1768 callback.update(region, row.getRow(),
1769 (R) responsePrototype.newBuilderForType().mergeFrom(
1770 serviceResult.getValue().getValue()).build());
1771 } catch (InvalidProtocolBufferException e) {
1772 LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1773 e);
1774 callbackErrorExceptions.add(e);
1775 callbackErrorActions.add(row);
1776 callbackErrorServers.add("null");
1777 }
1778 }
1779
1780 @Override
1781 public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) {
1782 RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
1783 LOG.error("Failed calling endpoint " + methodDescriptor.getFullName() + ": region="
1784 + Bytes.toStringBinary(exec.getRegion()), t);
1785 return true;
1786 }
1787
1788 @Override
1789 public boolean retriableFailure(int originalIndex, Row row, byte[] region,
1790 Throwable exception) {
1791 RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
1792 LOG.error("Failed calling endpoint " + methodDescriptor.getFullName() + ": region="
1793 + Bytes.toStringBinary(exec.getRegion()), exception);
1794 return !(exception instanceof DoNotRetryIOException);
1795 }
1796 },
1797 configuration, rpcCallerFactory, rpcControllerFactory);
1798
1799 asyncProcess.submitAll(execs);
1800 asyncProcess.waitUntilDone();
1801
1802 if (asyncProcess.hasError()) {
1803 throw asyncProcess.getErrors();
1804 } else if (!callbackErrorExceptions.isEmpty()) {
1805 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1806 callbackErrorServers);
1807 }
1808 }
1809 }