1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentMap;
33 import java.util.concurrent.ConcurrentSkipListMap;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.concurrent.atomic.AtomicLong;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.hbase.DoNotRetryIOException;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HRegionLocation;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
50 import org.apache.hadoop.hbase.client.coprocessor.Batch;
51 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54 import org.apache.hadoop.hbase.util.Pair;
55 import org.cloudera.htrace.Trace;
56
57 import com.google.common.base.Preconditions;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 class AsyncProcess<CResult> {
96 private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
97
98
99
100
101
102
103
104 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
105 "hbase.client.start.log.errors.counter";
106 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
107
108 protected static final AtomicLong COUNTER = new AtomicLong();
109 protected final long id;
110 private final int startLogErrorsCnt;
111 protected final HConnection hConnection;
112 protected final TableName tableName;
113 protected final ExecutorService pool;
114 protected final AsyncProcessCallback<CResult> callback;
115 protected final BatchErrors errors = new BatchErrors();
116 protected final AtomicBoolean hasError = new AtomicBoolean(false);
117 protected final AtomicLong tasksSent = new AtomicLong(0);
118 protected final AtomicLong tasksDone = new AtomicLong(0);
119 protected final AtomicLong retriesCnt = new AtomicLong(0);
120 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
121 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
122 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
123 new ConcurrentHashMap<ServerName, AtomicInteger>();
124 protected final int timeout;
125
126
127
128
129 protected final int maxTotalConcurrentTasks;
130
131
132
133
134
135
136
137 protected final int maxConcurrentTasksPerRegion;
138
139
140
141
142 protected final int maxConcurrentTasksPerServer;
143 protected final long pause;
144 protected int numTries;
145 protected int serverTrackerTimeout;
146 protected RpcRetryingCallerFactory rpcCallerFactory;
147 private RpcControllerFactory rpcFactory;
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 interface AsyncProcessCallback<CResult> {
164
165
166
167
168 void success(int originalIndex, byte[] region, Row row, CResult result);
169
170
171
172
173
174
175
176
177
178 boolean failure(int originalIndex, byte[] region, Row row, Throwable t);
179
180
181
182
183
184
185
186 boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception);
187 }
188
189 private static class BatchErrors {
190 private final List<Throwable> throwables = new ArrayList<Throwable>();
191 private final List<Row> actions = new ArrayList<Row>();
192 private final List<String> addresses = new ArrayList<String>();
193
194 public synchronized void add(Throwable ex, Row row, HRegionLocation location) {
195 if (row == null){
196 throw new IllegalArgumentException("row cannot be null. location=" + location);
197 }
198
199 throwables.add(ex);
200 actions.add(row);
201 addresses.add(location != null ? location.getServerName().toString() : "null location");
202 }
203
204 private synchronized RetriesExhaustedWithDetailsException makeException() {
205 return new RetriesExhaustedWithDetailsException(
206 new ArrayList<Throwable>(throwables),
207 new ArrayList<Row>(actions), new ArrayList<String>(addresses));
208 }
209
210 public synchronized void clear() {
211 throwables.clear();
212 actions.clear();
213 addresses.clear();
214 }
215 }
216
217 public AsyncProcess(HConnection hc, TableName tableName, ExecutorService pool,
218 AsyncProcessCallback<CResult> callback, Configuration conf,
219 RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
220 if (hc == null){
221 throw new IllegalArgumentException("HConnection cannot be null.");
222 }
223
224 this.hConnection = hc;
225 this.tableName = tableName;
226 this.pool = pool;
227 this.callback = callback;
228
229 this.id = COUNTER.incrementAndGet();
230
231 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
232 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
233 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
234 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
235 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
236 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
237
238
239 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
240 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
241 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
242 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
243 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
244 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
245
246 this.startLogErrorsCnt =
247 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
248
249 if (this.maxTotalConcurrentTasks <= 0) {
250 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
251 }
252 if (this.maxConcurrentTasksPerServer <= 0) {
253 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
254 maxConcurrentTasksPerServer);
255 }
256 if (this.maxConcurrentTasksPerRegion <= 0) {
257 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
258 maxConcurrentTasksPerRegion);
259 }
260
261
262
263
264
265
266
267
268 this.serverTrackerTimeout = 0;
269 for (int i = 0; i < this.numTries; ++i) {
270 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
271 }
272
273 this.rpcCallerFactory = rpcCaller;
274 Preconditions.checkNotNull(rpcFactory);
275 this.rpcFactory = rpcFactory;
276 }
277
278
279
280
281
282
283
284
285 public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
286 submit(rows, atLeastOne, null);
287 }
288
289
290
291
292
293
294
295
296
297 public void submit(List<? extends Row> rows, boolean atLeastOne,
298 Batch.Callback<CResult> batchCallback) throws InterruptedIOException {
299 if (rows.isEmpty()) {
300 return;
301 }
302
303
304
305 Map<HRegionLocation, MultiAction<Row>> actionsByServer =
306 new HashMap<HRegionLocation, MultiAction<Row>>();
307 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
308
309 long currentTaskCnt = tasksDone.get();
310 boolean alreadyLooped = false;
311
312 NonceGenerator ng = this.hConnection.getNonceGenerator();
313 do {
314 if (alreadyLooped){
315
316 waitForNextTaskDone(currentTaskCnt);
317 currentTaskCnt = tasksDone.get();
318 } else {
319 alreadyLooped = true;
320 }
321
322
323 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
324
325
326
327 Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
328 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
329
330 int posInList = -1;
331 Iterator<? extends Row> it = rows.iterator();
332 while (it.hasNext()) {
333 Row r = it.next();
334 HRegionLocation loc = findDestLocation(r, posInList);
335
336 if (loc == null) {
337 it.remove();
338 } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
339 Action<Row> action = new Action<Row>(r, ++posInList);
340 setNonce(ng, r, action);
341 retainedActions.add(action);
342 addAction(loc, action, actionsByServer, ng);
343 it.remove();
344 }
345 }
346 } while (retainedActions.isEmpty() && atLeastOne && !hasError());
347
348 HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
349 sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, batchCallback);
350 }
351
352
353
354
355
356
357
358
359
360 private void addAction(HRegionLocation loc, Action<Row> action, Map<HRegionLocation,
361 MultiAction<Row>> actionsByServer, NonceGenerator ng) {
362 final byte[] regionName = loc.getRegionInfo().getRegionName();
363 MultiAction<Row> multiAction = actionsByServer.get(loc);
364 if (multiAction == null) {
365 multiAction = new MultiAction<Row>();
366 actionsByServer.put(loc, multiAction);
367 }
368 if (action.hasNonce() && !multiAction.hasNonceGroup()) {
369
370
371 multiAction.setNonceGroup(ng.getNonceGroup());
372 }
373
374 multiAction.add(regionName, action);
375 }
376
377
378
379
380
381
382
383
384 private HRegionLocation findDestLocation(Row row, int posInList) {
385 if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
386 HRegionLocation loc = null;
387 IOException locationException = null;
388 try {
389 loc = hConnection.locateRegion(this.tableName, row.getRow());
390 if (loc == null) {
391 locationException = new IOException("#" + id + ", no location found, aborting submit for" +
392 " tableName=" + tableName +
393 " rowkey=" + Arrays.toString(row.getRow()));
394 }
395 } catch (IOException e) {
396 locationException = e;
397 }
398 if (locationException != null) {
399
400
401 manageError(posInList, row, false, locationException, null);
402 return null;
403 }
404
405 return loc;
406 }
407
408
409
410
411
412
413
414
415
416 protected boolean canTakeOperation(HRegionLocation loc,
417 Map<Long, Boolean> regionsIncluded,
418 Map<ServerName, Boolean> serversIncluded) {
419 long regionId = loc.getRegionInfo().getRegionId();
420 Boolean regionPrevious = regionsIncluded.get(regionId);
421
422 if (regionPrevious != null) {
423
424 return regionPrevious;
425 }
426
427 Boolean serverPrevious = serversIncluded.get(loc.getServerName());
428 if (Boolean.FALSE.equals(serverPrevious)) {
429
430 regionsIncluded.put(regionId, Boolean.FALSE);
431 return false;
432 }
433
434 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
435 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
436
437 regionsIncluded.put(regionId, Boolean.FALSE);
438 return false;
439 }
440
441 if (serverPrevious == null) {
442
443 int newServers = 0;
444 for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
445 if (kv.getValue()) {
446 newServers++;
447 }
448 }
449
450
451 boolean ok = (newServers + getCurrentTasksCount()) < maxTotalConcurrentTasks;
452
453 if (ok) {
454
455 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
456 ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
457 }
458
459 if (!ok) {
460 regionsIncluded.put(regionId, Boolean.FALSE);
461 serversIncluded.put(loc.getServerName(), Boolean.FALSE);
462 return false;
463 }
464
465 serversIncluded.put(loc.getServerName(), Boolean.TRUE);
466 } else {
467 assert serverPrevious.equals(Boolean.TRUE);
468 }
469
470 regionsIncluded.put(regionId, Boolean.TRUE);
471
472 return true;
473 }
474
475
476
477
478
479
480
481 public void submitAll(List<? extends Row> rows) {
482 List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
483
484
485 int posInList = -1;
486 NonceGenerator ng = this.hConnection.getNonceGenerator();
487 for (Row r : rows) {
488 posInList++;
489 if (r instanceof Put) {
490 Put put = (Put) r;
491 if (put.isEmpty()) {
492 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
493 }
494 }
495 Action<Row> action = new Action<Row>(r, posInList);
496 setNonce(ng, r, action);
497 actions.add(action);
498 }
499 HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
500 submit(actions, actions, 1, errorsByServer);
501 }
502
503 private void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
504 if (!(r instanceof Append) && !(r instanceof Increment)) return;
505 action.setNonce(ng.newNonce());
506 }
507
508
509
510
511
512
513
514
515
516
517 private void submit(List<Action<Row>> initialActions,
518 List<Action<Row>> currentActions, int numAttempt,
519 final HConnectionManager.ServerErrorTracker errorsByServer) {
520
521 if (numAttempt > 1){
522 retriesCnt.incrementAndGet();
523 }
524
525
526 final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
527 new HashMap<HRegionLocation, MultiAction<Row>>();
528
529 NonceGenerator ng = this.hConnection.getNonceGenerator();
530 for (Action<Row> action : currentActions) {
531 HRegionLocation loc = findDestLocation(action.getAction(), action.getOriginalIndex());
532 if (loc != null) {
533 addAction(loc, action, actionsByServer, ng);
534 }
535 }
536
537 if (!actionsByServer.isEmpty()) {
538 sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer, null);
539 }
540 }
541
542
543
544
545
546
547
548
549
550 public void sendMultiAction(final List<Action<Row>> initialActions,
551 Map<HRegionLocation, MultiAction<Row>> actionsByServer,
552 final int numAttempt,
553 final HConnectionManager.ServerErrorTracker errorsByServer,
554 Batch.Callback<CResult> batchCallback) {
555
556
557 for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
558 HRegionLocation loc = e.getKey();
559 MultiAction<Row> multiAction = e.getValue();
560 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(initialActions, loc,
561 multiAction, numAttempt, errorsByServer, batchCallback);
562 for (Runnable runnable: runnables) {
563 try {
564 incTaskCounters(multiAction.getRegions(), loc.getServerName());
565 this.pool.submit(runnable);
566 } catch (RejectedExecutionException ree) {
567
568
569 decTaskCounters(multiAction.getRegions(), loc.getServerName());
570 LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
571 " Server is " + loc.getServerName(), ree);
572
573
574 receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer);
575 }
576 }
577 }
578 }
579
580 private Runnable getNewSingleServerRunnable(
581 final List<Action<Row>> initialActions,
582 final HRegionLocation loc,
583 final MultiAction<Row> multiAction,
584 final int numAttempt,
585 final HConnectionManager.ServerErrorTracker errorsByServer,
586 final Batch.Callback<CResult> batchCallback) {
587 return new Runnable() {
588 @Override
589 public void run() {
590 MultiResponse res;
591 try {
592 MultiServerCallable<Row> callable = createCallable(loc, multiAction);
593 try {
594 res = createCaller(callable).callWithoutRetries(callable, timeout);
595 } catch (IOException e) {
596
597
598 receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e,
599 errorsByServer);
600 return;
601 } catch (Throwable t) {
602
603 LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
604 " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t);
605 receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t,
606 errorsByServer);
607 return;
608 }
609
610
611 receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer,
612 batchCallback);
613
614 } finally {
615 decTaskCounters(multiAction.getRegions(), loc.getServerName());
616 }
617 }
618 };
619 }
620
621 private Collection<? extends Runnable> getNewMultiActionRunnable(
622 final List<Action<Row>> initialActions,
623 final HRegionLocation loc,
624 final MultiAction<Row> multiAction,
625 final int numAttempt,
626 final HConnectionManager.ServerErrorTracker errorsByServer,
627 final Batch.Callback<CResult> batchCallback) {
628
629 if (AsyncProcess.this.hConnection.getStatisticsTracker() == null) {
630 List<Runnable> toReturn = new ArrayList<Runnable>(1);
631 toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction",
632 getNewSingleServerRunnable(initialActions, loc, multiAction, numAttempt,
633 errorsByServer, batchCallback)));
634 return toReturn;
635 } else {
636
637 Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
638 .size());
639
640
641 for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
642 Long backoff = getBackoff(loc);
643 DelayingRunner runner = actions.get(backoff);
644 if (runner == null) {
645 actions.put(backoff, new DelayingRunner(backoff, e));
646 } else {
647 runner.add(e);
648 }
649 }
650
651 List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
652 for (DelayingRunner runner : actions.values()) {
653 String traceText = "AsyncProcess.sendMultiAction";
654 Runnable runnable = getNewSingleServerRunnable(initialActions, loc, runner.getActions(),
655 numAttempt, errorsByServer, batchCallback);
656
657 if (runner.getSleepTime() > 0) {
658 runner.setRunner(runnable);
659 traceText = "AsyncProcess.clientBackoff.sendMultiAction";
660 runnable = runner;
661 }
662 runnable = Trace.wrap(traceText, runnable);
663 toReturn.add(runnable);
664 }
665 return toReturn;
666 }
667 }
668
669
670
671
672
673
674
675 private Long getBackoff(HRegionLocation location) {
676 ServerStatisticTracker tracker = AsyncProcess.this.hConnection.getStatisticsTracker();
677 ServerStatistics stats = tracker.getStats(location.getServerName());
678 return AsyncProcess.this.hConnection.getBackoffPolicy()
679 .getBackoffTime(location.getServerName(), location.getRegionInfo().getRegionName(),
680 stats);
681 }
682
683
684
685
686 protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
687 final MultiAction<Row> multi) {
688 return new MultiServerCallable<Row>(hConnection, tableName, location, this.rpcFactory, multi);
689 }
690
691
692
693
694
695
696 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
697 return rpcCallerFactory.<MultiResponse> newCaller();
698 }
699
700
701
702
703
704
705
706
707
708
709
710 private boolean manageError(int originalIndex, Row row, boolean canRetry,
711 Throwable throwable, HRegionLocation location) {
712 if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) {
713 canRetry = false;
714 }
715
716 byte[] region = null;
717 if (canRetry && callback != null) {
718 region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
719 canRetry = callback.retriableFailure(originalIndex, row, region, throwable);
720 }
721
722 if (!canRetry) {
723 if (callback != null) {
724 if (region == null && location != null) {
725 region = location.getRegionInfo().getEncodedNameAsBytes();
726 }
727 callback.failure(originalIndex, region, row, throwable);
728 }
729 errors.add(throwable, row, location);
730 this.hasError.set(true);
731 }
732
733 return canRetry;
734 }
735
736
737
738
739
740
741
742
743
744
745 private void receiveGlobalFailure(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
746 HRegionLocation location, int numAttempt, Throwable t,
747 HConnectionManager.ServerErrorTracker errorsByServer) {
748
749
750 hConnection.updateCachedLocations(tableName,
751 rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
752 errorsByServer.reportServerError(location);
753 boolean canRetry = errorsByServer.canRetryMore(numAttempt);
754
755 List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
756 for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
757 for (Action<Row> action : e.getValue()) {
758 if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, location)) {
759 toReplay.add(action);
760 }
761 }
762 }
763
764 logAndResubmit(initialActions, location, toReplay, numAttempt, rsActions.size(),
765 t, errorsByServer);
766 }
767
768
769
770
771
772 private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
773 List<Action<Row>> toReplay, int numAttempt, int failureCount,
774 Throwable throwable,
775 HConnectionManager.ServerErrorTracker errorsByServer) {
776 if (toReplay.isEmpty()) {
777
778 if (failureCount != 0) {
779
780 LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
781 oldLocation.getServerName(), throwable, -1, false,
782 errorsByServer.getStartTrackingTime()));
783 } else if (numAttempt > startLogErrorsCnt + 1) {
784
785 LOG.info(createLog(numAttempt, failureCount, 0,
786 oldLocation.getServerName(), throwable, -1, false,
787 errorsByServer.getStartTrackingTime()));
788 }
789 return;
790 }
791
792
793
794
795
796
797
798
799 long backOffTime = errorsByServer.calculateBackoffTime(oldLocation, pause);
800
801 if (numAttempt > startLogErrorsCnt) {
802
803
804 LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
805 oldLocation.getServerName(), throwable, backOffTime, true,
806 errorsByServer.getStartTrackingTime()));
807 }
808
809 try {
810 Thread.sleep(backOffTime);
811 } catch (InterruptedException e) {
812 LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldLocation, e);
813 Thread.currentThread().interrupt();
814 return;
815 }
816
817 submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
818 }
819
820
821
822
823
824
825
826
827
828
829 private void receiveMultiAction(List<Action<Row>> initialActions, MultiAction<Row> multiAction,
830 HRegionLocation location,
831 MultiResponse responses, int numAttempt,
832 HConnectionManager.ServerErrorTracker errorsByServer,
833 Batch.Callback<CResult> batchCallback) {
834 assert responses != null;
835
836
837
838
839
840
841
842 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
843 Throwable throwable = null;
844 int failureCount = 0;
845 boolean canRetry = true;
846
847 for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
848 responses.getResults().entrySet()) {
849
850 boolean regionFailureRegistered = false;
851 for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
852 Object result = regionResult.getSecond();
853
854
855 if (result == null || result instanceof Throwable) {
856 throwable = (Throwable) result;
857 Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
858 Row row = correspondingAction.getAction();
859 failureCount++;
860 if (!regionFailureRegistered) {
861 regionFailureRegistered= true;
862
863 hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
864 if (failureCount == 1) {
865 errorsByServer.reportServerError(location);
866 canRetry = errorsByServer.canRetryMore(numAttempt);
867 }
868 }
869
870 if (manageError(correspondingAction.getOriginalIndex(), row, canRetry,
871 throwable, location)) {
872 toReplay.add(correspondingAction);
873 }
874 } else {
875 if (callback != null || batchCallback != null) {
876 int index = regionResult.getFirst();
877 Action<Row> correspondingAction = initialActions.get(index);
878 Row row = correspondingAction.getAction();
879 if (callback != null) {
880
881 this.callback.success(index, resultsForRS.getKey(), row, (CResult) result);
882 }
883 if (batchCallback != null) {
884 batchCallback.update(resultsForRS.getKey(), row.getRow(), (CResult) result);
885 }
886 }
887 }
888 }
889 }
890
891
892
893
894 for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
895 throwable = throwableEntry.getValue();
896 byte[] region =throwableEntry.getKey();
897 List<Action<Row>> actions = multiAction.actions.get(region);
898 if (actions == null || actions.isEmpty()) {
899 throw new IllegalStateException("Wrong response for the region: " +
900 HRegionInfo.encodeRegionName(region));
901 }
902
903 if (failureCount == 0) {
904 errorsByServer.reportServerError(location);
905 canRetry = errorsByServer.canRetryMore(numAttempt);
906 }
907 hConnection.updateCachedLocations(this.tableName, actions.get(0).getAction().getRow(),
908 throwable, location);
909 failureCount += actions.size();
910
911 for (Action<Row> action : actions) {
912 Row row = action.getAction();
913 if (manageError(action.getOriginalIndex(), row, canRetry, throwable, location)) {
914 toReplay.add(action);
915 }
916 }
917 }
918
919 logAndResubmit(initialActions, location, toReplay, numAttempt, failureCount,
920 throwable, errorsByServer);
921 }
922
923 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
924 Throwable error, long backOffTime, boolean willRetry, String startTime){
925 StringBuilder sb = new StringBuilder();
926
927 sb.append("#").append(id).append(", table=").append(tableName).
928 append(", attempt=").append(numAttempt).append("/").append(numTries).append(" ");
929
930 if (failureCount > 0 || error != null){
931 sb.append("failed ").append(failureCount).append(" ops").append(", last exception: ").
932 append(error == null ? "null" : error);
933 } else {
934 sb.append("SUCCEEDED");
935 }
936
937 sb.append(" on ").append(sn);
938
939 sb.append(", tracking started ").append(startTime);
940
941 if (willRetry) {
942 sb.append(", retrying after ").append(backOffTime).append(" ms").
943 append(", replay ").append(replaySize).append(" ops.");
944 } else if (failureCount > 0) {
945 sb.append(" - FAILED, NOT RETRYING ANYMORE");
946 }
947
948 return sb.toString();
949 }
950
951
952
953
954
955 protected void waitForNextTaskDone(long currentNumberOfTask) throws InterruptedIOException {
956 synchronized (this.tasksDone) {
957 while (currentNumberOfTask == tasksDone.get()) {
958 try {
959 this.tasksDone.wait(100);
960 } catch (InterruptedException e) {
961 throw new InterruptedIOException("#" + id + ", interrupted." +
962 " currentNumberOfTask=" + currentNumberOfTask +
963 ", tableName=" + tableName + ", tasksDone=" + tasksDone.get());
964 }
965 }
966 }
967 }
968
969
970
971
972 private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
973 long lastLog = EnvironmentEdgeManager.currentTimeMillis();
974 long currentTasksDone = this.tasksDone.get();
975
976 while ((tasksSent.get() - currentTasksDone) > max) {
977 long now = EnvironmentEdgeManager.currentTimeMillis();
978 if (now > lastLog + 10000) {
979 lastLog = now;
980 LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
981 + max + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + tasksDone.get() +
982 ", currentTasksDone=" + currentTasksDone + ", retries=" + retriesCnt.get() +
983 " hasError=" + hasError() + ", tableName=" + tableName);
984 }
985 waitForNextTaskDone(currentTasksDone);
986 currentTasksDone = this.tasksDone.get();
987 }
988 }
989
990 private long getCurrentTasksCount(){
991 return tasksSent.get() - tasksDone.get();
992 }
993
994
995
996
997 public void waitUntilDone() throws InterruptedIOException {
998 waitForMaximumCurrentTasks(0);
999 }
1000
1001
1002 public boolean hasError() {
1003 return hasError.get();
1004 }
1005
1006 public List<? extends Row> getFailedOperations() {
1007 return errors.actions;
1008 }
1009
1010
1011
1012
1013 public void clearErrors() {
1014 errors.clear();
1015 hasError.set(false);
1016 }
1017
1018 public RetriesExhaustedWithDetailsException getErrors() {
1019 return errors.makeException();
1020 }
1021
1022
1023
1024
1025 protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1026 tasksSent.incrementAndGet();
1027
1028 AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1029 if (serverCnt == null) {
1030 taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1031 serverCnt = taskCounterPerServer.get(sn);
1032 }
1033 serverCnt.incrementAndGet();
1034
1035 for (byte[] regBytes : regions) {
1036 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1037 if (regionCnt == null) {
1038 regionCnt = new AtomicInteger();
1039 AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1040 if (oldCnt != null) {
1041 regionCnt = oldCnt;
1042 }
1043 }
1044 regionCnt.incrementAndGet();
1045 }
1046 }
1047
1048
1049
1050
1051 protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1052 for (byte[] regBytes : regions) {
1053 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1054 regionCnt.decrementAndGet();
1055 }
1056
1057 taskCounterPerServer.get(sn).decrementAndGet();
1058
1059 tasksDone.incrementAndGet();
1060 synchronized (tasksDone) {
1061 tasksDone.notifyAll();
1062 }
1063 }
1064
1065
1066
1067
1068
1069
1070
1071
1072 protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
1073 return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout, this.numTries);
1074 }
1075 }