1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.UndeclaredThrowableException;
25 import java.net.SocketTimeoutException;
26 import java.util.ArrayList;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.DoNotRetryIOException;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.ipc.RpcClient;
35 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36 import org.apache.hadoop.hbase.util.ExceptionUtil;
37 import org.apache.hadoop.ipc.RemoteException;
38
39 import com.google.protobuf.ServiceException;
40
41
42
43
44 @InterfaceAudience.Private
45 @edu.umd.cs.findbugs.annotations.SuppressWarnings
46 (value = "IS2_INCONSISTENT_SYNC", justification = "na")
47 public class RpcRetryingCaller<T> {
48 static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class);
49
50
51
52 private int callTimeout;
53
54
55
56 private long globalStartTime;
57
58
59
60 private final static int MIN_RPC_TIMEOUT = 2000;
61
62 private final int startLogErrorsCnt;
63
64 private final long pause;
65 private final int retries;
66
67 public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
68 this.pause = pause;
69 this.retries = retries;
70 this.startLogErrorsCnt = startLogErrorsCnt;
71 }
72
73 private void beforeCall() {
74 int remaining = (int)(callTimeout -
75 (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
76 if (remaining < MIN_RPC_TIMEOUT) {
77
78
79
80 remaining = MIN_RPC_TIMEOUT;
81 }
82 RpcClient.setRpcTimeout(remaining);
83 }
84
85 private void afterCall() {
86 RpcClient.resetRpcTimeout();
87 }
88
89 public synchronized T callWithRetries(RetryingCallable<T> callable) throws IOException,
90 RuntimeException {
91 return callWithRetries(callable, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
92 }
93
94
95
96
97
98
99
100
101
102 @edu.umd.cs.findbugs.annotations.SuppressWarnings
103 (value = "SWL_SLEEP_WITH_LOCK_HELD", justification = "na")
104 public synchronized T callWithRetries(RetryingCallable<T> callable, int callTimeout)
105 throws IOException, RuntimeException {
106 this.callTimeout = callTimeout;
107 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
108 new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
109 this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
110 for (int tries = 0;; tries++) {
111 long expectedSleep = 0;
112 try {
113 beforeCall();
114 callable.prepare(tries != 0);
115 return callable.call();
116 } catch (Throwable t) {
117 if (tries > startLogErrorsCnt) {
118 LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" +
119 (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms, msg="
120 + callable.getExceptionMessageAdditionalDetail());
121 }
122
123 t = translateException(t);
124 callable.throwable(t, retries != 1);
125 RetriesExhaustedException.ThrowableWithExtraContext qt =
126 new RetriesExhaustedException.ThrowableWithExtraContext(t,
127 EnvironmentEdgeManager.currentTimeMillis(), toString());
128 exceptions.add(qt);
129 ExceptionUtil.rethrowIfInterrupt(t);
130 if (tries >= retries - 1) {
131 throw new RetriesExhaustedException(tries, exceptions);
132 }
133
134
135
136 expectedSleep = callable.sleep(pause, tries + 1);
137
138
139 long duration = singleCallDuration(expectedSleep);
140 if (duration > this.callTimeout) {
141 String msg = "callTimeout=" + this.callTimeout + ", callDuration=" + duration +
142 ": " + callable.getExceptionMessageAdditionalDetail();
143 throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
144 }
145 } finally {
146 afterCall();
147 }
148 try {
149 Thread.sleep(expectedSleep);
150 } catch (InterruptedException e) {
151 throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries);
152 }
153 }
154 }
155
156
157
158
159
160 private long singleCallDuration(final long expectedSleep) {
161 return (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)
162 + MIN_RPC_TIMEOUT + expectedSleep;
163 }
164
165
166
167
168
169
170
171
172
173
174 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
175 throws IOException, RuntimeException {
176
177 this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
178 this.callTimeout = callTimeout;
179 try {
180 beforeCall();
181 callable.prepare(false);
182 return callable.call();
183 } catch (Throwable t) {
184 Throwable t2 = translateException(t);
185 ExceptionUtil.rethrowIfInterrupt(t2);
186
187 if (t2 instanceof IOException) {
188 throw (IOException)t2;
189 } else {
190 throw new RuntimeException(t2);
191 }
192 } finally {
193 afterCall();
194 }
195 }
196
197
198
199
200
201
202
203
204 static Throwable translateException(Throwable t) throws DoNotRetryIOException {
205 if (t instanceof UndeclaredThrowableException) {
206 if (t.getCause() != null) {
207 t = t.getCause();
208 }
209 }
210 if (t instanceof RemoteException) {
211 t = ((RemoteException)t).unwrapRemoteException();
212 }
213 if (t instanceof LinkageError) {
214 throw new DoNotRetryIOException(t);
215 }
216 if (t instanceof ServiceException) {
217 ServiceException se = (ServiceException)t;
218 Throwable cause = se.getCause();
219 if (cause != null && cause instanceof DoNotRetryIOException) {
220 throw (DoNotRetryIOException)cause;
221 }
222
223 t = cause;
224
225 translateException(t);
226 } else if (t instanceof DoNotRetryIOException) {
227 throw (DoNotRetryIOException)t;
228 }
229 return t;
230 }
231 }