1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27 import java.net.InetSocketAddress;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseConfiguration;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.testclassification.MediumTests;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
39 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
40 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
41 import org.apache.hadoop.hbase.security.User;
42 import org.apache.log4j.AppenderSkeleton;
43 import org.apache.log4j.Level;
44 import org.apache.log4j.Logger;
45 import org.apache.log4j.spi.LoggingEvent;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48
49 import com.google.common.collect.Lists;
50 import com.google.protobuf.BlockingRpcChannel;
51 import com.google.protobuf.BlockingService;
52 import com.google.protobuf.RpcController;
53 import com.google.protobuf.ServiceException;
54
55
56
57
58
59
60 @Category(MediumTests.class)
61 public class TestDelayedRpc {
62 private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
63 public static RpcServerInterface rpcServer;
64 public static final int UNDELAYED = 0;
65 public static final int DELAYED = 1;
66 private static final int RPC_CLIENT_TIMEOUT = 30000;
67
68 @Test (timeout=60000)
69 public void testDelayedRpcImmediateReturnValue() throws Exception {
70 testDelayedRpc(false);
71 }
72
73 @Test (timeout=60000)
74 public void testDelayedRpcDelayedReturnValue() throws Exception {
75 testDelayedRpc(true);
76 }
77
78 private void testDelayedRpc(boolean delayReturnValue) throws Exception {
79 LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
80 Configuration conf = HBaseConfiguration.create();
81 InetSocketAddress isa = new InetSocketAddress("localhost", 0);
82 TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
83 BlockingService service =
84 TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
85 rpcServer = new RpcServer(null, "testDelayedRpc",
86 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
87 isa,
88 conf,
89 new FifoRpcScheduler(conf, 1));
90 rpcServer.start();
91 RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
92 try {
93 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
94 ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
95 rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
96 User.getCurrent(), RPC_CLIENT_TIMEOUT);
97 TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
98 TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
99 List<Integer> results = new ArrayList<Integer>();
100
101 TestThread th1 = new TestThread(stub, true, results);
102
103 TestThread th2 = new TestThread(stub, false, results);
104 TestThread th3 = new TestThread(stub, false, results);
105 th1.start();
106 Thread.sleep(100);
107 th2.start();
108 Thread.sleep(200);
109 th3.start();
110
111 th1.join();
112 th2.join();
113 th3.join();
114
115
116 assertEquals(UNDELAYED, results.get(0).intValue());
117 assertEquals(UNDELAYED, results.get(1).intValue());
118 assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF);
119 } finally {
120 rpcClient.stop();
121 }
122 }
123
124 private static class ListAppender extends AppenderSkeleton {
125 private final List<String> messages = new ArrayList<String>();
126
127 @Override
128 protected void append(LoggingEvent event) {
129 messages.add(event.getMessage().toString());
130 }
131
132 @Override
133 public void close() {
134 }
135
136 @Override
137 public boolean requiresLayout() {
138 return false;
139 }
140
141 public List<String> getMessages() {
142 return messages;
143 }
144 }
145
146
147
148
149
150 @Test (timeout=60000)
151 public void testTooManyDelayedRpcs() throws Exception {
152 Configuration conf = HBaseConfiguration.create();
153 final int MAX_DELAYED_RPC = 10;
154 conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
155
156 ListAppender listAppender = new ListAppender();
157 Logger log = Logger.getLogger("org.apache.hadoop.ipc.RpcServer");
158 log.addAppender(listAppender);
159 log.setLevel(Level.WARN);
160
161
162 InetSocketAddress isa = new InetSocketAddress("localhost", 0);
163 TestDelayedImplementation instance = new TestDelayedImplementation(true);
164 BlockingService service =
165 TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
166 rpcServer = new RpcServer(null, "testTooManyDelayedRpcs",
167 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
168 isa,
169 conf,
170 new FifoRpcScheduler(conf, 1));
171 rpcServer.start();
172 RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
173 try {
174 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
175 ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
176 rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
177 User.getCurrent(), RPC_CLIENT_TIMEOUT);
178 TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
179 TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
180 Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
181 for (int i = 0; i < MAX_DELAYED_RPC; i++) {
182 threads[i] = new TestThread(stub, true, null);
183 threads[i].start();
184 }
185
186
187 assertTrue(listAppender.getMessages().isEmpty());
188
189
190 threads[MAX_DELAYED_RPC] = new TestThread(stub, true, null);
191 threads[MAX_DELAYED_RPC].start();
192
193 for (int i = 0; i < MAX_DELAYED_RPC; i++) {
194 threads[i].join();
195 }
196
197 assertFalse(listAppender.getMessages().isEmpty());
198 assertTrue(listAppender.getMessages().get(0).startsWith("Too many delayed calls"));
199
200 log.removeAppender(listAppender);
201 } finally {
202 rpcClient.stop();
203 }
204 }
205
206 public static class TestDelayedImplementation
207 implements TestDelayedRpcProtos.TestDelayedService.BlockingInterface {
208
209
210
211
212 private final boolean delayReturnValue;
213
214
215
216
217
218 public TestDelayedImplementation(boolean delayReturnValue) {
219 this.delayReturnValue = delayReturnValue;
220 }
221
222 @Override
223 public TestResponse test(final RpcController rpcController, final TestArg testArg)
224 throws ServiceException {
225 boolean delay = testArg.getDelay();
226 TestResponse.Builder responseBuilder = TestResponse.newBuilder();
227 if (!delay) {
228 responseBuilder.setResponse(UNDELAYED);
229 return responseBuilder.build();
230 }
231 final Delayable call = RpcServer.getCurrentCall();
232 call.startDelay(delayReturnValue);
233 new Thread() {
234 @Override
235 public void run() {
236 try {
237 Thread.sleep(500);
238 TestResponse.Builder responseBuilder = TestResponse.newBuilder();
239 call.endDelay(delayReturnValue ?
240 responseBuilder.setResponse(DELAYED).build() : null);
241 } catch (Exception e) {
242 e.printStackTrace();
243 }
244 }
245 }.start();
246
247
248 responseBuilder.setResponse(0xDEADBEEF);
249 return responseBuilder.build();
250 }
251 }
252
253 public static class TestThread extends Thread {
254 private final TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub;
255 private final boolean delay;
256 private final List<Integer> results;
257
258 public TestThread(TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub,
259 boolean delay, List<Integer> results) {
260 this.stub = stub;
261 this.delay = delay;
262 this.results = results;
263 }
264
265 @Override
266 public void run() {
267 Integer result;
268 try {
269 result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).build()).
270 getResponse());
271 } catch (ServiceException e) {
272 throw new RuntimeException(e);
273 }
274 if (results != null) {
275 synchronized (results) {
276 results.add(result);
277 }
278 }
279 }
280 }
281
282 @Test
283 public void testEndDelayThrowing() throws IOException {
284 Configuration conf = HBaseConfiguration.create();
285 InetSocketAddress isa = new InetSocketAddress("localhost", 0);
286 FaultyTestDelayedImplementation instance = new FaultyTestDelayedImplementation();
287 BlockingService service =
288 TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
289 rpcServer = new RpcServer(null, "testEndDelayThrowing",
290 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
291 isa,
292 conf,
293 new FifoRpcScheduler(conf, 1));
294 rpcServer.start();
295 RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
296 try {
297 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
298 ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
299 rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
300 User.getCurrent(), 1000);
301 TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
302 TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
303
304 int result = 0xDEADBEEF;
305
306 try {
307 result = stub.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
308 } catch (Exception e) {
309 fail("No exception should have been thrown.");
310 }
311 assertEquals(result, UNDELAYED);
312
313 boolean caughtException = false;
314 try {
315 result = stub.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
316 } catch(Exception e) {
317
318 if (e.getCause().getMessage().contains("java.lang.Exception: Something went wrong")) {
319 caughtException = true;
320 }
321 LOG.warn("Caught exception, expected=" + caughtException);
322 }
323 assertTrue(caughtException);
324 } finally {
325 rpcClient.stop();
326 }
327 }
328
329
330
331
332 private static class FaultyTestDelayedImplementation extends TestDelayedImplementation {
333 public FaultyTestDelayedImplementation() {
334 super(false);
335 }
336
337 @Override
338 public TestResponse test(RpcController rpcController, TestArg arg)
339 throws ServiceException {
340 LOG.info("In faulty test, delay=" + arg.getDelay());
341 if (!arg.getDelay()) return TestResponse.newBuilder().setResponse(UNDELAYED).build();
342 Delayable call = RpcServer.getCurrentCall();
343 call.startDelay(true);
344 LOG.info("In faulty test, delaying");
345 try {
346 call.endDelayThrowing(new Exception("Something went wrong"));
347 } catch (IOException e) {
348 e.printStackTrace();
349 }
350
351 return TestResponse.newBuilder().setResponse(DELAYED).build();
352 }
353 }
354 }