1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24 import static org.mockito.Matchers.anyInt;
25 import static org.mockito.Matchers.anyObject;
26 import static org.mockito.Mockito.doThrow;
27 import static org.mockito.Mockito.spy;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.internal.verification.VerificationModeFactory.times;
30
31 import java.io.IOException;
32 import java.net.InetSocketAddress;
33 import java.net.Socket;
34 import java.util.ArrayList;
35 import java.util.List;
36
37 import javax.net.SocketFactory;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.CellScannable;
44 import org.apache.hadoop.hbase.CellScanner;
45 import org.apache.hadoop.hbase.CellUtil;
46 import org.apache.hadoop.hbase.HBaseConfiguration;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.KeyValue;
50 import org.apache.hadoop.hbase.KeyValueUtil;
51 import org.apache.hadoop.hbase.client.Put;
52 import org.apache.hadoop.hbase.client.RowMutations;
53 import org.apache.hadoop.hbase.codec.Codec;
54 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
55 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
56 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
57 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
58 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
59 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
60 import org.apache.hadoop.hbase.protobuf.RequestConverter;
61 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
62 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
63 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
64 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
65 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
66 import org.apache.hadoop.hbase.security.User;
67 import org.apache.hadoop.hbase.testclassification.SmallTests;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.util.Pair;
70 import org.apache.hadoop.io.compress.GzipCodec;
71 import org.apache.hadoop.net.NetUtils;
72 import org.apache.hadoop.util.StringUtils;
73 import org.junit.Test;
74 import org.junit.experimental.categories.Category;
75 import org.mockito.Mockito;
76 import org.mockito.invocation.InvocationOnMock;
77 import org.mockito.stubbing.Answer;
78
79 import com.google.common.collect.ImmutableList;
80 import com.google.common.collect.Lists;
81 import com.google.protobuf.BlockingService;
82 import com.google.protobuf.ByteString;
83 import com.google.protobuf.Descriptors.MethodDescriptor;
84 import com.google.protobuf.Message;
85 import com.google.protobuf.RpcController;
86 import com.google.protobuf.ServiceException;
87
88
89
90
91 @Category(SmallTests.class)
92 public class TestIPC {
93 public static final Log LOG = LogFactory.getLog(TestIPC.class);
94 static byte [] CELL_BYTES = Bytes.toBytes("xyz");
95 static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
96 static byte [] BIG_CELL_BYTES = new byte [10 * 1024];
97 static Cell BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
98 private final static Configuration CONF = HBaseConfiguration.create();
99
100
101
102
103 private static final BlockingService SERVICE =
104 TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
105 new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
106
107 @Override
108 public EmptyResponseProto ping(RpcController controller,
109 EmptyRequestProto request) throws ServiceException {
110
111 return null;
112 }
113
114 @Override
115 public EmptyResponseProto error(RpcController controller,
116 EmptyRequestProto request) throws ServiceException {
117
118 return null;
119 }
120
121 @Override
122 public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
123 throws ServiceException {
124 if (controller instanceof PayloadCarryingRpcController) {
125 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
126
127
128
129 CellScanner cellScanner = pcrc.cellScanner();
130 List<Cell> list = null;
131 if (cellScanner != null) {
132 list = new ArrayList<Cell>();
133 try {
134 while(cellScanner.advance()) {
135 list.add(cellScanner.current());
136 }
137 } catch (IOException e) {
138 throw new ServiceException(e);
139 }
140 }
141 cellScanner = CellUtil.createCellScanner(list);
142 ((PayloadCarryingRpcController)controller).setCellScanner(cellScanner);
143 }
144 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
145 }
146 });
147
148
149
150
151
152 private static class TestRpcServer extends RpcServer {
153
154 TestRpcServer() throws IOException {
155 this(new FifoRpcScheduler(CONF, 1));
156 }
157
158 TestRpcServer(RpcScheduler scheduler) throws IOException {
159 super(null, "testRpcServer",
160 Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
161 new InetSocketAddress("localhost", 0), CONF, scheduler);
162 }
163
164 @Override
165 public Pair<Message, CellScanner> call(BlockingService service,
166 MethodDescriptor md, Message param, CellScanner cellScanner,
167 long receiveTime, MonitoredRPCHandler status) throws IOException {
168 return super.call(service, md, param, cellScanner, receiveTime, status);
169 }
170 }
171
172
173
174
175
176
177 @Test
178 public void testNoCodec() throws InterruptedException, IOException {
179 Configuration conf = HBaseConfiguration.create();
180 RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT) {
181 @Override
182 Codec getCodec() {
183 return null;
184 }
185 };
186 TestRpcServer rpcServer = new TestRpcServer();
187 try {
188 rpcServer.start();
189 InetSocketAddress address = rpcServer.getListenerAddress();
190 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
191 final String message = "hello";
192 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
193 Pair<Message, CellScanner> r = client.call(md, param, null,
194 md.getOutputType().toProto(), User.getCurrent(), address, 0);
195 assertTrue(r.getSecond() == null);
196
197 assertTrue(r.getFirst().toString().contains(message));
198 } finally {
199 client.stop();
200 rpcServer.stop();
201 }
202 }
203
204
205
206
207
208
209
210
211
212
213 @Test
214 public void testCompressCellBlock()
215 throws IOException, InterruptedException, SecurityException, NoSuchMethodException {
216 Configuration conf = new Configuration(HBaseConfiguration.create());
217 conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
218 doSimpleTest(conf, new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT));
219 }
220
221 private void doSimpleTest(final Configuration conf, final RpcClient client)
222 throws InterruptedException, IOException {
223 TestRpcServer rpcServer = new TestRpcServer();
224 List<Cell> cells = new ArrayList<Cell>();
225 int count = 3;
226 for (int i = 0; i < count; i++) cells.add(CELL);
227 try {
228 rpcServer.start();
229 InetSocketAddress address = rpcServer.getListenerAddress();
230 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
231 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
232 Pair<Message, CellScanner> r = client.call(md, param, CellUtil.createCellScanner(cells),
233 md.getOutputType().toProto(), User.getCurrent(), address, 0);
234 int index = 0;
235 while (r.getSecond().advance()) {
236 assertTrue(CELL.equals(r.getSecond().current()));
237 index++;
238 }
239 assertEquals(count, index);
240 } finally {
241 client.stop();
242 rpcServer.stop();
243 }
244 }
245
246 @Test
247 public void testRTEDuringConnectionSetup() throws Exception {
248 Configuration conf = HBaseConfiguration.create();
249 SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
250 Mockito.doAnswer(new Answer<Socket>() {
251 @Override
252 public Socket answer(InvocationOnMock invocation) throws Throwable {
253 Socket s = spy((Socket)invocation.callRealMethod());
254 doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt());
255 return s;
256 }
257 }).when(spyFactory).createSocket();
258
259 TestRpcServer rpcServer = new TestRpcServer();
260 RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
261 try {
262 rpcServer.start();
263 InetSocketAddress address = rpcServer.getListenerAddress();
264 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
265 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
266 client.call(md, param, null, null, User.getCurrent(), address, 0);
267 fail("Expected an exception to have been thrown!");
268 } catch (Exception e) {
269 LOG.info("Caught expected exception: " + e.toString());
270 assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
271 } finally {
272 client.stop();
273 rpcServer.stop();
274 }
275 }
276
277
278 @Test
279 public void testRpcScheduler() throws IOException, InterruptedException {
280 RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
281 RpcServer rpcServer = new TestRpcServer(scheduler);
282 verify(scheduler).init((RpcScheduler.Context) anyObject());
283 RpcClient client = new RpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT);
284 try {
285 rpcServer.start();
286 verify(scheduler).start();
287 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
288 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
289 for (int i = 0; i < 10; i++) {
290 client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)),
291 md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
292 }
293 verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
294 } finally {
295 rpcServer.stop();
296 verify(scheduler).stop();
297 }
298 }
299
300 public static void main(String[] args)
301 throws IOException, SecurityException, NoSuchMethodException, InterruptedException {
302 if (args.length != 2) {
303 System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
304 return;
305 }
306
307
308 int cycles = Integer.parseInt(args[0]);
309 int cellcount = Integer.parseInt(args[1]);
310 Configuration conf = HBaseConfiguration.create();
311 TestRpcServer rpcServer = new TestRpcServer();
312 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
313 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
314 RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT);
315 KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL);
316 Put p = new Put(kv.getRow());
317 for (int i = 0; i < cellcount; i++) {
318 p.add(kv);
319 }
320 RowMutations rm = new RowMutations(kv.getRow());
321 rm.add(p);
322 try {
323 rpcServer.start();
324 InetSocketAddress address = rpcServer.getListenerAddress();
325 long startTime = System.currentTimeMillis();
326 User user = User.getCurrent();
327 for (int i = 0; i < cycles; i++) {
328 List<CellScannable> cells = new ArrayList<CellScannable>();
329
330 ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction(
331 HConstants.EMPTY_BYTE_ARRAY, rm, cells,
332 RegionAction.newBuilder(),
333 ClientProtos.Action.newBuilder(),
334 MutationProto.newBuilder());
335 builder.setRegion(RegionSpecifier.newBuilder().setType(RegionSpecifierType.REGION_NAME).
336 setValue(ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
337 if (i % 100000 == 0) {
338 LOG.info("" + i);
339
340
341
342 }
343 CellScanner cellScanner = CellUtil.createCellScanner(cells);
344 Pair<Message, CellScanner> response =
345 client.call(md, builder.build(), cellScanner, param, user, address, 0);
346
347
348
349
350
351
352 }
353 LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " +
354 (System.currentTimeMillis() - startTime) + "ms");
355 } finally {
356 client.stop();
357 rpcServer.stop();
358 }
359 }
360 }