View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Collection;
27  import java.util.HashMap;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ConcurrentMap;
33  import java.util.concurrent.ConcurrentSkipListMap;
34  import java.util.concurrent.ExecutorService;
35  import java.util.concurrent.RejectedExecutionException;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.atomic.AtomicLong;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.DoNotRetryIOException;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HRegionLocation;
47  import org.apache.hadoop.hbase.ServerName;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
50  import org.apache.hadoop.hbase.client.coprocessor.Batch;
51  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54  import org.apache.hadoop.hbase.util.Pair;
55  import org.cloudera.htrace.Trace;
56  
57  import com.google.common.base.Preconditions;
58  
59  /**
60   * This class  allows a continuous flow of requests. It's written to be compatible with a
61   * synchronous caller such as HTable.
62   * <p>
63   * The caller sends a buffer of operation, by calling submit. This class extract from this list
64   * the operations it can send, i.e. the operations that are on region that are not considered
65   * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
66   * iterate on the list. If, and only if, the maximum number of current task is reached, the call
67   * to submit will block.
68   * </p>
69   * <p>
70   * The class manages internally the retries.
71   * </p>
72   * <p>
73   * The class includes an error marker: it allows to know if an operation has failed or not, and
74   * to get the exception details, i.e. the full list of throwables for each attempt. This marker
75   * is here to help the backward compatibility in HTable. In most (new) cases, it should be
76   * managed by the callbacks.
77   * </p>
78   * <p>
79   * A callback is available, in order to: <list>
80   * <li>Get the result of the operation (failure or success)</li>
81   * <li>When an operation fails but could be retried, allows or not to retry</li>
82   * <li>When an operation fails for good (can't be retried or already retried the maximum number
83   * time), register the error or not.
84   * </list>
85   * <p>
86   * This class is not thread safe externally; only one thread should submit operations at a time.
87   * Internally, the class is thread safe enough to manage simultaneously new submission and results
88   * arising from older operations.
89   * </p>
90   * <p>
91   * Internally, this class works with {@link Row}, this mean it could be theoretically used for
92   * gets as well.
93   * </p>
94   */
95  class AsyncProcess<CResult> {
96    private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
97  
98    /**
99     * Configure the number of failures after which the client will start logging. A few failures
100    * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
101    * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
102    * this stage.
103    */
104   public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
105       "hbase.client.start.log.errors.counter";
106   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
107 
108   protected static final AtomicLong COUNTER = new AtomicLong();
109   protected final long id;
110   private final int startLogErrorsCnt;
111   protected final HConnection hConnection;
112   protected final TableName tableName;
113   protected final ExecutorService pool;
114   protected final AsyncProcessCallback<CResult> callback;
115   protected final BatchErrors errors = new BatchErrors();
116   protected final AtomicBoolean hasError = new AtomicBoolean(false);
117   protected final AtomicLong tasksSent = new AtomicLong(0);
118   protected final AtomicLong tasksDone = new AtomicLong(0);
119   protected final AtomicLong retriesCnt = new AtomicLong(0);
120   protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
121       new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
122   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
123       new ConcurrentHashMap<ServerName, AtomicInteger>();
124   protected final int timeout;
125 
126   /**
127    * The number of tasks simultaneously executed on the cluster.
128    */
129   protected final int maxTotalConcurrentTasks;
130 
131   /**
132    * The number of tasks we run in parallel on a single region.
133    * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
134    * a set of operations on a region before the previous one is done. As well, this limits
135    * the pressure we put on the region server.
136    */
137   protected final int maxConcurrentTasksPerRegion;
138 
139   /**
140    * The number of task simultaneously executed on a single region server.
141    */
142   protected final int maxConcurrentTasksPerServer;
143   protected final long pause;
144   protected int numTries;
145   protected int serverTrackerTimeout;
146   protected RpcRetryingCallerFactory rpcCallerFactory;
147   private RpcControllerFactory rpcFactory;
148 
149 
150   /**
151    * This interface allows to keep the interface of the previous synchronous interface, that uses
152    * an array of object to return the result.
153    * <p/>
154    * This interface allows the caller to specify the behavior on errors: <list>
155    * <li>If we have not yet reach the maximum number of retries, the user can nevertheless
156    * specify if this specific operation should be retried or not.
157    * </li>
158    * <li>If an operation fails (i.e. is not retried or fails after all retries), the user can
159    * specify is we should mark this AsyncProcess as in error or not.
160    * </li>
161    * </list>
162    */
163   interface AsyncProcessCallback<CResult> {
164 
165     /**
166      * Called on success. originalIndex holds the index in the action list.
167      */
168     void success(int originalIndex, byte[] region, Row row, CResult result);
169 
170     /**
171      * called on failure, if we don't retry (i.e. called once per failed operation).
172      *
173      * @return true if we should store the error and tag this async process as being in error.
174      *         false if the failure of this operation can be safely ignored, and does not require
175      *         the current process to be stopped without proceeding with the other operations in
176      *         the queue.
177      */
178     boolean failure(int originalIndex, byte[] region, Row row, Throwable t);
179 
180     /**
181      * Called on a failure we plan to retry. This allows the user to stop retrying. Will be
182      * called multiple times for a single action if it fails multiple times.
183      *
184      * @return false if we should retry, true otherwise.
185      */
186     boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception);
187   }
188 
189   private static class BatchErrors {
190     private final List<Throwable> throwables = new ArrayList<Throwable>();
191     private final List<Row> actions = new ArrayList<Row>();
192     private final List<String> addresses = new ArrayList<String>();
193 
194     public synchronized void add(Throwable ex, Row row, HRegionLocation location) {
195       if (row == null){
196         throw new IllegalArgumentException("row cannot be null. location=" + location);
197       }
198 
199       throwables.add(ex);
200       actions.add(row);
201       addresses.add(location != null ? location.getServerName().toString() : "null location");
202     }
203 
204     private synchronized RetriesExhaustedWithDetailsException makeException() {
205       return new RetriesExhaustedWithDetailsException(
206           new ArrayList<Throwable>(throwables),
207           new ArrayList<Row>(actions), new ArrayList<String>(addresses));
208     }
209 
210     public synchronized void clear() {
211       throwables.clear();
212       actions.clear();
213       addresses.clear();
214     }
215   }
216 
217   public AsyncProcess(HConnection hc, TableName tableName, ExecutorService pool,
218       AsyncProcessCallback<CResult> callback, Configuration conf,
219       RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
220     if (hc == null){
221       throw new IllegalArgumentException("HConnection cannot be null.");
222     }
223 
224     this.hConnection = hc;
225     this.tableName = tableName;
226     this.pool = pool;
227     this.callback = callback;
228 
229     this.id = COUNTER.incrementAndGet();
230 
231     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
232         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
233     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
234         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
235     this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
236         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
237 
238 
239     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
240       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
241     this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
242           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
243     this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
244           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
245 
246     this.startLogErrorsCnt =
247         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
248 
249     if (this.maxTotalConcurrentTasks <= 0) {
250       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
251     }
252     if (this.maxConcurrentTasksPerServer <= 0) {
253       throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
254           maxConcurrentTasksPerServer);
255     }
256     if (this.maxConcurrentTasksPerRegion <= 0) {
257       throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
258           maxConcurrentTasksPerRegion);
259     }
260 
261     // Server tracker allows us to do faster, and yet useful (hopefully), retries.
262     // However, if we are too useful, we might fail very quickly due to retry count limit.
263     // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
264     // retry time if normal retries were used. Then we will retry until this time runs out.
265     // If we keep hitting one server, the net effect will be the incremental backoff, and
266     // essentially the same number of retries as planned. If we have to do faster retries,
267     // we will do more retries in aggregate, but the user will be none the wiser.
268     this.serverTrackerTimeout = 0;
269     for (int i = 0; i < this.numTries; ++i) {
270       serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
271     }
272 
273     this.rpcCallerFactory = rpcCaller;
274     Preconditions.checkNotNull(rpcFactory);
275     this.rpcFactory = rpcFactory;
276   }
277 
278   /**
279    * Extract from the rows list what we can submit. The rows we can not submit are kept in the
280    * list.
281    *
282    * @param rows - the submitted row. Modified by the method: we remove the rows we took.
283    * @param atLeastOne true if we should submit at least a subset.
284    */
285   public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
286     submit(rows, atLeastOne, null);
287   }
288 
289   /**
290    * Extract from the rows list what we can submit. The rows we can not submit are kept in the
291    * list.
292    *
293    * @param rows - the submitted row. Modified by the method: we remove the rows we took.
294    * @param atLeastOne true if we should submit at least a subset.
295    * @param batchCallback Batch callback. Only called on success
296    */
297   public void submit(List<? extends Row> rows, boolean atLeastOne,
298       Batch.Callback<CResult> batchCallback) throws InterruptedIOException {
299     if (rows.isEmpty()) {
300       return;
301     }
302 
303     // This looks like we are keying by region but HRegionLocation has a comparator that compares
304     // on the server portion only (hostname + port) so this Map collects regions by server.
305     Map<HRegionLocation, MultiAction<Row>> actionsByServer =
306       new HashMap<HRegionLocation, MultiAction<Row>>();
307     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
308 
309     long currentTaskCnt = tasksDone.get();
310     boolean alreadyLooped = false;
311 
312     NonceGenerator ng = this.hConnection.getNonceGenerator();
313     do {
314       if (alreadyLooped){
315         // if, for whatever reason, we looped, we want to be sure that something has changed.
316         waitForNextTaskDone(currentTaskCnt);
317         currentTaskCnt = tasksDone.get();
318       } else {
319         alreadyLooped = true;
320       }
321 
322       // Wait until there is at least one slot for a new task.
323       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
324 
325       // Remember the previous decisions about regions or region servers we put in the
326       //  final multi.
327       Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
328       Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
329 
330       int posInList = -1;
331       Iterator<? extends Row> it = rows.iterator();
332       while (it.hasNext()) {
333         Row r = it.next();
334         HRegionLocation loc = findDestLocation(r, posInList);
335 
336         if (loc == null) { // loc is null if there is an error such as meta not available.
337           it.remove();
338         } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
339           Action<Row> action = new Action<Row>(r, ++posInList);
340           setNonce(ng, r, action);
341           retainedActions.add(action);
342           addAction(loc, action, actionsByServer, ng);
343           it.remove();
344         }
345       }
346     } while (retainedActions.isEmpty() && atLeastOne && !hasError());
347 
348     HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
349     sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, batchCallback);
350   }
351 
352   /**
353    * Group the actions per region server.
354    *
355    * @param loc - the destination. Must not be null.
356    * @param action - the action to add to the multiaction
357    * @param actionsByServer the multiaction per server
358    * @param ng Nonce generator, or null if no nonces are needed.
359    */
360   private void addAction(HRegionLocation loc, Action<Row> action, Map<HRegionLocation,
361       MultiAction<Row>> actionsByServer, NonceGenerator ng) {
362     final byte[] regionName = loc.getRegionInfo().getRegionName();
363     MultiAction<Row> multiAction = actionsByServer.get(loc);
364     if (multiAction == null) {
365       multiAction = new MultiAction<Row>();
366       actionsByServer.put(loc, multiAction);
367     }
368     if (action.hasNonce() && !multiAction.hasNonceGroup()) {
369       // TODO: this code executes for every (re)try, and calls getNonceGroup again
370       //       for the same action. It must return the same value across calls.
371       multiAction.setNonceGroup(ng.getNonceGroup());
372     }
373 
374     multiAction.add(regionName, action);
375   }
376 
377   /**
378    * Find the destination.
379    *
380    * @param row          the row
381    * @param posInList    the position in the list
382    * @return the destination. Null if we couldn't find it.
383    */
384   private HRegionLocation findDestLocation(Row row, int posInList) {
385     if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
386     HRegionLocation loc = null;
387     IOException locationException = null;
388     try {
389       loc = hConnection.locateRegion(this.tableName, row.getRow());
390       if (loc == null) {
391         locationException = new IOException("#" + id + ", no location found, aborting submit for" +
392             " tableName=" + tableName +
393             " rowkey=" + Arrays.toString(row.getRow()));
394       }
395     } catch (IOException e) {
396       locationException = e;
397     }
398     if (locationException != null) {
399       // There are multiple retries in locateRegion already. No need to add new.
400       // We can't continue with this row, hence it's the last retry.
401       manageError(posInList, row, false, locationException, null);
402       return null;
403     }
404 
405     return loc;
406   }
407 
408   /**
409    * Check if we should send new operations to this region or region server.
410    * We're taking into account the past decision; if we have already accepted
411    * operation on a given region, we accept all operations for this region.
412    *
413    * @param loc; the region and the server name we want to use.
414    * @return true if this region is considered as busy.
415    */
416   protected boolean canTakeOperation(HRegionLocation loc,
417                                      Map<Long, Boolean> regionsIncluded,
418                                      Map<ServerName, Boolean> serversIncluded) {
419     long regionId = loc.getRegionInfo().getRegionId();
420     Boolean regionPrevious = regionsIncluded.get(regionId);
421 
422     if (regionPrevious != null) {
423       // We already know what to do with this region.
424       return regionPrevious;
425     }
426 
427     Boolean serverPrevious = serversIncluded.get(loc.getServerName());
428     if (Boolean.FALSE.equals(serverPrevious)) {
429       // It's a new region, on a region server that we have already excluded.
430       regionsIncluded.put(regionId, Boolean.FALSE);
431       return false;
432     }
433 
434     AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
435     if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
436       // Too many tasks on this region already.
437       regionsIncluded.put(regionId, Boolean.FALSE);
438       return false;
439     }
440 
441     if (serverPrevious == null) {
442       // The region is ok, but we need to decide for this region server.
443       int newServers = 0; // number of servers we're going to contact so far
444       for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
445         if (kv.getValue()) {
446           newServers++;
447         }
448       }
449 
450       // Do we have too many total tasks already?
451       boolean ok = (newServers + getCurrentTasksCount()) < maxTotalConcurrentTasks;
452 
453       if (ok) {
454         // If the total is fine, is it ok for this individual server?
455         AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
456         ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
457       }
458 
459       if (!ok) {
460         regionsIncluded.put(regionId, Boolean.FALSE);
461         serversIncluded.put(loc.getServerName(), Boolean.FALSE);
462         return false;
463       }
464 
465       serversIncluded.put(loc.getServerName(), Boolean.TRUE);
466     } else {
467       assert serverPrevious.equals(Boolean.TRUE);
468     }
469 
470     regionsIncluded.put(regionId, Boolean.TRUE);
471 
472     return true;
473   }
474 
475   /**
476    * Submit immediately the list of rows, whatever the server status. Kept for backward
477    * compatibility: it allows to be used with the batch interface that return an array of objects.
478    *
479    * @param rows the list of rows.
480    */
481   public void submitAll(List<? extends Row> rows) {
482     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
483 
484     // The position will be used by the processBatch to match the object array returned.
485     int posInList = -1;
486     NonceGenerator ng = this.hConnection.getNonceGenerator();
487     for (Row r : rows) {
488       posInList++;
489       if (r instanceof Put) {
490         Put put = (Put) r;
491         if (put.isEmpty()) {
492           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
493         }
494       }
495       Action<Row> action = new Action<Row>(r, posInList);
496       setNonce(ng, r, action);
497       actions.add(action);
498     }
499     HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
500     submit(actions, actions, 1, errorsByServer);
501   }
502 
503   private void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
504     if (!(r instanceof Append) && !(r instanceof Increment)) return;
505     action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
506   }
507 
508 
509   /**
510    * Group a list of actions per region servers, and send them. The created MultiActions are
511    * added to the inProgress list. Does not take into account the region/server load.
512    *
513    * @param initialActions - the full list of the actions in progress
514    * @param currentActions - the list of row to submit
515    * @param numAttempt - the current numAttempt (first attempt is 1)
516    */
517   private void submit(List<Action<Row>> initialActions,
518                       List<Action<Row>> currentActions, int numAttempt,
519                       final HConnectionManager.ServerErrorTracker errorsByServer) {
520 
521     if (numAttempt > 1){
522       retriesCnt.incrementAndGet();
523     }
524 
525     // group per location => regions server
526     final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
527         new HashMap<HRegionLocation, MultiAction<Row>>();
528 
529     NonceGenerator ng = this.hConnection.getNonceGenerator();
530     for (Action<Row> action : currentActions) {
531       HRegionLocation loc = findDestLocation(action.getAction(), action.getOriginalIndex());
532       if (loc != null) {
533         addAction(loc, action, actionsByServer, ng);
534       }
535     }
536 
537     if (!actionsByServer.isEmpty()) {
538       sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer, null);
539     }
540   }
541 
542   /**
543    * Send a multi action structure to the servers, after a delay depending on the attempt
544    * number. Asynchronous.
545    *
546    * @param initialActions  the list of the actions, flat.
547    * @param actionsByServer the actions structured by regions
548    * @param numAttempt      the attempt number.
549    */
550   public void sendMultiAction(final List<Action<Row>> initialActions,
551                               Map<HRegionLocation, MultiAction<Row>> actionsByServer,
552                               final int numAttempt,
553                               final HConnectionManager.ServerErrorTracker errorsByServer,
554                               Batch.Callback<CResult> batchCallback) {
555     // Send the queries and add them to the inProgress list
556     // This iteration is by server (the HRegionLocation comparator is by server portion only).
557     for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
558       HRegionLocation loc = e.getKey();
559       MultiAction<Row> multiAction = e.getValue();
560       Collection<? extends Runnable> runnables = getNewMultiActionRunnable(initialActions, loc,
561         multiAction, numAttempt, errorsByServer, batchCallback);
562       for (Runnable runnable: runnables) {
563         try {
564           incTaskCounters(multiAction.getRegions(), loc.getServerName());
565           this.pool.submit(runnable);
566         } catch (RejectedExecutionException ree) {
567           // This should never happen. But as the pool is provided by the end user, let's secure
568           //  this a little.
569           decTaskCounters(multiAction.getRegions(), loc.getServerName());
570           LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
571             " Server is " + loc.getServerName(), ree);
572           // We're likely to fail again, but this will increment the attempt counter, so it will
573           //  finish.
574           receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer);
575         }
576       }
577     }
578   }
579 
580   private Runnable getNewSingleServerRunnable(
581       final List<Action<Row>> initialActions,
582       final HRegionLocation loc,
583       final MultiAction<Row> multiAction,
584       final int numAttempt,
585       final HConnectionManager.ServerErrorTracker errorsByServer,
586       final Batch.Callback<CResult> batchCallback) {
587     return new Runnable() {
588       @Override
589       public void run() {
590         MultiResponse res;
591         try {
592           MultiServerCallable<Row> callable = createCallable(loc, multiAction);
593           try {
594             res = createCaller(callable).callWithoutRetries(callable, timeout);
595           } catch (IOException e) {
596             // The service itself failed . It may be an error coming from the communication
597             //   layer, but, as well, a functional error raised by the server.
598             receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e,
599                 errorsByServer);
600             return;
601           } catch (Throwable t) {
602             // This should not happen. Let's log & retry anyway.
603             LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
604                 " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t);
605             receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t,
606                 errorsByServer);
607             return;
608           }
609 
610           // Nominal case: we received an answer from the server, and it's not an exception.
611           receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer,
612             batchCallback);
613 
614         } finally {
615           decTaskCounters(multiAction.getRegions(), loc.getServerName());
616         }
617       }
618     };
619   }
620 
621   private Collection<? extends Runnable> getNewMultiActionRunnable(
622       final List<Action<Row>> initialActions,
623       final HRegionLocation loc,
624       final MultiAction<Row> multiAction,
625       final int numAttempt,
626       final HConnectionManager.ServerErrorTracker errorsByServer,
627       final Batch.Callback<CResult> batchCallback) {
628     // no stats to manage, just do the standard action
629     if (AsyncProcess.this.hConnection.getStatisticsTracker() == null) {
630       List<Runnable> toReturn = new ArrayList<Runnable>(1);
631       toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction", 
632         getNewSingleServerRunnable(initialActions, loc, multiAction, numAttempt,
633           errorsByServer, batchCallback)));
634       return toReturn;
635     } else {
636       // group the actions by the amount of delay
637       Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
638         .size());
639 
640       // split up the actions
641       for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
642         Long backoff = getBackoff(loc);
643         DelayingRunner runner = actions.get(backoff);
644         if (runner == null) {
645           actions.put(backoff, new DelayingRunner(backoff, e));
646         } else {
647           runner.add(e);
648         }
649       }
650 
651       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
652       for (DelayingRunner runner : actions.values()) {
653         String traceText = "AsyncProcess.sendMultiAction";
654         Runnable runnable = getNewSingleServerRunnable(initialActions, loc, runner.getActions(),
655           numAttempt, errorsByServer, batchCallback);
656         // use a delay runner only if we need to sleep for some time
657         if (runner.getSleepTime() > 0) {
658           runner.setRunner(runnable);
659           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
660           runnable = runner;
661         }
662         runnable = Trace.wrap(traceText, runnable);
663         toReturn.add(runnable);
664       }
665       return toReturn;
666     }
667   }
668 
669   /**
670    * @param server server location where the target region is hosted
671    * @param regionName name of the region which we are going to write some data
672    * @return the amount of time the client should wait until it submit a request to the
673    * specified server and region
674    */
675   private Long getBackoff(HRegionLocation location) {
676     ServerStatisticTracker tracker = AsyncProcess.this.hConnection.getStatisticsTracker();
677     ServerStatistics stats = tracker.getStats(location.getServerName());
678     return AsyncProcess.this.hConnection.getBackoffPolicy()
679       .getBackoffTime(location.getServerName(), location.getRegionInfo().getRegionName(),
680         stats);
681   }
682 
683   /**
684    * Create a callable. Isolated to be easily overridden in the tests.
685    */
686   protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
687       final MultiAction<Row> multi) {
688     return new MultiServerCallable<Row>(hConnection, tableName, location, this.rpcFactory, multi);
689   }
690 
691   /**
692    * For tests.
693    * @param callable: used in tests.
694    * @return Returns a caller.
695    */
696   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
697     return rpcCallerFactory.<MultiResponse> newCaller();
698   }
699 
700   /**
701    * Check that we can retry acts accordingly: logs, set the error status, call the callbacks.
702    *
703    * @param originalIndex the position in the list sent
704    * @param row           the row
705    * @param canRetry      if false, we won't retry whatever the settings.
706    * @param throwable     the throwable, if any (can be null)
707    * @param location      the location, if any (can be null)
708    * @return true if the action can be retried, false otherwise.
709    */
710   private boolean manageError(int originalIndex, Row row, boolean canRetry,
711                               Throwable throwable, HRegionLocation location) {
712     if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) {
713       canRetry = false;
714     }
715 
716     byte[] region = null;
717     if (canRetry && callback != null) {
718       region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
719       canRetry = callback.retriableFailure(originalIndex, row, region, throwable);
720     }
721 
722     if (!canRetry) {
723       if (callback != null) {
724         if (region == null && location != null) {
725           region = location.getRegionInfo().getEncodedNameAsBytes();
726         }
727         callback.failure(originalIndex, region, row, throwable);
728       }
729       errors.add(throwable, row, location);
730       this.hasError.set(true);
731     }
732 
733     return canRetry;
734   }
735 
736   /**
737    * Resubmit all the actions from this multiaction after a failure.
738    *
739    * @param initialActions the full initial action list
740    * @param rsActions  the actions still to do from the initial list
741    * @param location   the destination
742    * @param numAttempt the number of attempts so far
743    * @param t the throwable (if any) that caused the resubmit
744    */
745   private void receiveGlobalFailure(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
746                                     HRegionLocation location, int numAttempt, Throwable t,
747                                     HConnectionManager.ServerErrorTracker errorsByServer) {
748     // Do not use the exception for updating cache because it might be coming from
749     // any of the regions in the MultiAction.
750     hConnection.updateCachedLocations(tableName,
751       rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
752     errorsByServer.reportServerError(location);
753     boolean canRetry = errorsByServer.canRetryMore(numAttempt);
754 
755     List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
756     for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
757       for (Action<Row> action : e.getValue()) {
758         if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, location)) {
759           toReplay.add(action);
760         }
761       }
762     }
763 
764     logAndResubmit(initialActions, location, toReplay, numAttempt, rsActions.size(),
765         t, errorsByServer);
766   }
767 
768   /**
769    * Log as many info as possible, and, if there is something to replay, submit it again after
770    *  a back off sleep.
771    */
772   private void logAndResubmit(List<Action<Row>> initialActions, HRegionLocation oldLocation,
773                               List<Action<Row>> toReplay, int numAttempt, int failureCount,
774                               Throwable throwable,
775                               HConnectionManager.ServerErrorTracker errorsByServer) {
776     if (toReplay.isEmpty()) {
777       // it's either a success or a last failure
778       if (failureCount != 0) {
779         // We have a failure but nothing to retry. We're done, it's a final failure..
780         LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
781             oldLocation.getServerName(), throwable, -1, false,
782             errorsByServer.getStartTrackingTime()));
783       } else if (numAttempt > startLogErrorsCnt + 1) {
784         // The operation was successful, but needed several attempts. Let's log this.
785         LOG.info(createLog(numAttempt, failureCount, 0,
786             oldLocation.getServerName(), throwable, -1, false,
787             errorsByServer.getStartTrackingTime()));
788       }
789       return;
790     }
791 
792     // We have something to replay. We're going to sleep a little before.
793 
794     // We have two contradicting needs here:
795     //  1) We want to get the new location after having slept, as it may change.
796     //  2) We want to take into account the location when calculating the sleep time.
797     // It should be possible to have some heuristics to take the right decision. Short term,
798     //  we go for one.
799     long backOffTime = errorsByServer.calculateBackoffTime(oldLocation, pause);
800 
801     if (numAttempt > startLogErrorsCnt) {
802       // We use this value to have some logs when we have multiple failures, but not too many
803       //  logs, as errors are to be expected when a region moves, splits and so on
804       LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
805           oldLocation.getServerName(), throwable, backOffTime, true,
806           errorsByServer.getStartTrackingTime()));
807     }
808 
809     try {
810       Thread.sleep(backOffTime);
811     } catch (InterruptedException e) {
812       LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldLocation, e);
813       Thread.currentThread().interrupt();
814       return;
815     }
816 
817     submit(initialActions, toReplay, numAttempt + 1, errorsByServer);
818   }
819 
820   /**
821    * Called when we receive the result of a server query.
822    *
823    * @param initialActions - the whole action list
824    * @param multiAction    - the multiAction we sent
825    * @param location       - the location. It's used as a server name.
826    * @param responses      - the response, if any
827    * @param numAttempt     - the attempt
828    */
829   private void receiveMultiAction(List<Action<Row>> initialActions, MultiAction<Row> multiAction,
830                                   HRegionLocation location,
831                                   MultiResponse responses, int numAttempt,
832                                   HConnectionManager.ServerErrorTracker errorsByServer,
833                                   Batch.Callback<CResult> batchCallback) {
834      assert responses != null;
835 
836     // Success or partial success
837     // Analyze detailed results. We can still have individual failures to be redo.
838     // two specific throwables are managed:
839     //  - DoNotRetryIOException: we continue to retry for other actions
840     //  - RegionMovedException: we update the cache with the new region location
841 
842     List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
843     Throwable throwable = null;
844     int failureCount = 0;
845     boolean canRetry = true;
846 
847     for (Map.Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
848         responses.getResults().entrySet()) {
849 
850       boolean regionFailureRegistered = false;
851       for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
852         Object result = regionResult.getSecond();
853 
854         // Failure: retry if it's make sense else update the errors lists
855         if (result == null || result instanceof Throwable) {
856           throwable = (Throwable) result;
857           Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
858           Row row = correspondingAction.getAction();
859           failureCount++;
860           if (!regionFailureRegistered) { // We're doing this once per location.
861             regionFailureRegistered= true;
862             // The location here is used as a server name.
863             hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
864             if (failureCount == 1) {
865               errorsByServer.reportServerError(location);
866               canRetry = errorsByServer.canRetryMore(numAttempt);
867             }
868           }
869 
870           if (manageError(correspondingAction.getOriginalIndex(), row, canRetry,
871               throwable, location)) {
872             toReplay.add(correspondingAction);
873           }
874         } else { // success
875           if (callback != null || batchCallback != null) {
876             int index = regionResult.getFirst();
877             Action<Row> correspondingAction = initialActions.get(index);
878             Row row = correspondingAction.getAction();
879             if (callback != null) {
880               //noinspection unchecked
881               this.callback.success(index, resultsForRS.getKey(), row, (CResult) result);
882             }
883             if (batchCallback != null) {
884               batchCallback.update(resultsForRS.getKey(), row.getRow(), (CResult) result);
885             }
886           }
887         }
888       }
889     }
890 
891     // The failures global to a region. We will use for multiAction we sent previously to find the
892     //   actions to replay.
893 
894     for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
895       throwable = throwableEntry.getValue();
896       byte[] region =throwableEntry.getKey();
897       List<Action<Row>> actions = multiAction.actions.get(region);
898       if (actions == null || actions.isEmpty()) {
899         throw new IllegalStateException("Wrong response for the region: " +
900             HRegionInfo.encodeRegionName(region));
901       }
902 
903       if (failureCount == 0) {
904         errorsByServer.reportServerError(location);
905         canRetry = errorsByServer.canRetryMore(numAttempt);
906       }
907       hConnection.updateCachedLocations(this.tableName, actions.get(0).getAction().getRow(),
908           throwable, location);
909       failureCount += actions.size();
910 
911       for (Action<Row> action : actions) {
912         Row row = action.getAction();
913         if (manageError(action.getOriginalIndex(), row, canRetry, throwable, location)) {
914           toReplay.add(action);
915         }
916       }
917     }
918 
919     logAndResubmit(initialActions, location, toReplay, numAttempt, failureCount,
920         throwable, errorsByServer);
921   }
922 
923   private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
924                            Throwable error, long backOffTime, boolean willRetry, String startTime){
925     StringBuilder sb = new StringBuilder();
926 
927     sb.append("#").append(id).append(", table=").append(tableName).
928         append(", attempt=").append(numAttempt).append("/").append(numTries).append(" ");
929 
930     if (failureCount > 0 || error != null){
931       sb.append("failed ").append(failureCount).append(" ops").append(", last exception: ").
932           append(error == null ? "null" : error);
933     } else {
934       sb.append("SUCCEEDED");
935     }
936 
937     sb.append(" on ").append(sn);
938 
939     sb.append(", tracking started ").append(startTime);
940 
941     if (willRetry) {
942       sb.append(", retrying after ").append(backOffTime).append(" ms").
943           append(", replay ").append(replaySize).append(" ops.");
944     } else if (failureCount > 0) {
945       sb.append(" - FAILED, NOT RETRYING ANYMORE");
946     }
947 
948     return sb.toString();
949   }
950 
951   /**
952    * Waits for another task to finish.
953    * @param currentNumberOfTask - the number of task finished when calling the method.
954    */
955   protected void waitForNextTaskDone(long currentNumberOfTask) throws InterruptedIOException {
956     synchronized (this.tasksDone) {
957       while (currentNumberOfTask == tasksDone.get()) {
958         try {
959           this.tasksDone.wait(100);
960         } catch (InterruptedException e) {
961           throw new InterruptedIOException("#" + id + ", interrupted." +
962               " currentNumberOfTask=" + currentNumberOfTask +
963               ",  tableName=" + tableName + ", tasksDone=" + tasksDone.get());
964         }
965       }
966     }
967   }
968 
969   /**
970    * Wait until the async does not have more than max tasks in progress.
971    */
972   private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
973     long lastLog = EnvironmentEdgeManager.currentTimeMillis();
974     long currentTasksDone = this.tasksDone.get();
975 
976     while ((tasksSent.get() - currentTasksDone) > max) {
977       long now = EnvironmentEdgeManager.currentTimeMillis();
978       if (now > lastLog + 10000) {
979         lastLog = now;
980         LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
981             + max + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + tasksDone.get() +
982             ", currentTasksDone=" + currentTasksDone + ", retries=" + retriesCnt.get() +
983             " hasError=" + hasError() + ", tableName=" + tableName);
984       }
985       waitForNextTaskDone(currentTasksDone);
986       currentTasksDone = this.tasksDone.get();
987     }
988   }
989 
990   private long getCurrentTasksCount(){
991     return  tasksSent.get() - tasksDone.get();
992   }
993 
994   /**
995    * Wait until all tasks are executed, successfully or not.
996    */
997   public void waitUntilDone() throws InterruptedIOException {
998     waitForMaximumCurrentTasks(0);
999   }
1000 
1001 
1002   public boolean hasError() {
1003     return hasError.get();
1004   }
1005 
1006   public List<? extends Row> getFailedOperations() {
1007     return errors.actions;
1008   }
1009 
1010   /**
1011    * Clean the errors stacks. Should be called only when there are no actions in progress.
1012    */
1013   public void clearErrors() {
1014     errors.clear();
1015     hasError.set(false);
1016   }
1017 
1018   public RetriesExhaustedWithDetailsException getErrors() {
1019     return errors.makeException();
1020   }
1021 
1022   /**
1023    * increment the tasks counters for a given set of regions. MT safe.
1024    */
1025   protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1026     tasksSent.incrementAndGet();
1027 
1028     AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1029     if (serverCnt == null) {
1030       taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1031       serverCnt = taskCounterPerServer.get(sn);
1032     }
1033     serverCnt.incrementAndGet();
1034 
1035     for (byte[] regBytes : regions) {
1036       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1037       if (regionCnt == null) {
1038         regionCnt = new AtomicInteger();
1039         AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1040         if (oldCnt != null) {
1041           regionCnt = oldCnt;
1042         }
1043       }
1044       regionCnt.incrementAndGet();
1045     }
1046   }
1047 
1048   /**
1049    * Decrements the counters for a given region and the region server. MT Safe.
1050    */
1051   protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1052     for (byte[] regBytes : regions) {
1053       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1054       regionCnt.decrementAndGet();
1055     }
1056 
1057     taskCounterPerServer.get(sn).decrementAndGet();
1058 
1059     tasksDone.incrementAndGet();
1060     synchronized (tasksDone) {
1061       tasksDone.notifyAll();
1062     }
1063   }
1064 
1065   /**
1066    * Creates the server error tracker to use inside process.
1067    * Currently, to preserve the main assumption about current retries, and to work well with
1068    * the retry-limit-based calculation, the calculation is local per Process object.
1069    * We may benefit from connection-wide tracking of server errors.
1070    * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1071    */
1072   protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
1073     return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout, this.numTries);
1074   }
1075 }