1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.KeyValue.Type;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
37 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
38 import org.apache.hadoop.hbase.testclassification.SmallTests;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.junit.experimental.categories.Category;
43 import org.mockito.Mockito;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46
47
48
49
50 @Category(SmallTests.class)
51 public class TestClientSmallReversedScanner {
52
53 Scan scan;
54 ExecutorService pool;
55 Configuration conf;
56
57 HConnection clusterConn;
58 RpcRetryingCallerFactory rpcFactory;
59 RpcControllerFactory controllerFactory;
60 RpcRetryingCaller<Result[]> caller;
61
62 @Before
63 @SuppressWarnings("unchecked")
64 public void setup() throws IOException {
65 clusterConn = Mockito.mock(HConnection.class);
66 rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
67 controllerFactory = Mockito.mock(RpcControllerFactory.class);
68 pool = Executors.newSingleThreadExecutor();
69 scan = new Scan();
70 conf = new Configuration();
71 Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
72
73 caller = Mockito.mock(RpcRetryingCaller.class);
74
75 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
76 }
77
78 @After
79 public void teardown() {
80 if (null != pool) {
81 pool.shutdownNow();
82 }
83 }
84
85
86
87
88 private Answer<Boolean> createTrueThenFalseAnswer() {
89 return new Answer<Boolean>() {
90 boolean first = true;
91
92 @Override
93 public Boolean answer(InvocationOnMock invocation) {
94 if (first) {
95 first = false;
96 return true;
97 }
98 return false;
99 }
100 };
101 }
102
103 private SmallScannerCallableFactory getFactory(
104 final RegionServerCallable<Result[]> callableWithReplicas) {
105 return new SmallScannerCallableFactory() {
106 @Override
107 public RegionServerCallable<Result[]> getCallable(final Scan sc, HConnection connection,
108 TableName table, byte[] localStartKey, final int cacheNum,
109 final RpcControllerFactory rpcControllerFactory) {
110 return callableWithReplicas;
111 }
112 };
113 }
114
115 @Test
116 public void testContextPresent() throws Exception {
117 final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
118 Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
119 Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
120 Type.Maximum);
121
122 @SuppressWarnings("unchecked")
123 RegionServerCallable<Result[]> callableWithReplicas = Mockito
124 .mock(RegionServerCallable.class);
125
126
127 @SuppressWarnings("unchecked")
128 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
129
130 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
131
132
133
134
135 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
136
137 ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
138 TableName.valueOf("table"), clusterConn);
139
140 try {
141 csrs.setRpcRetryingCaller(caller);
142 csrs.setRpcControllerFactory(controllerFactory);
143 csrs.setScannerCallableFactory(factory);
144
145
146 Mockito.when(
147 caller.callWithRetries(callableWithReplicas,
148 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)).thenAnswer(
149 new Answer<Result[]>() {
150 int count = 0;
151
152 @Override
153 public Result[] answer(InvocationOnMock invocation) {
154 Result[] results;
155 if (0 == count) {
156 results = new Result[] {Result.create(new Cell[] {kv3}),
157 Result.create(new Cell[] {kv2})};
158 } else if (1 == count) {
159 results = new Result[] {Result.create(new Cell[] {kv1})};
160 } else {
161 results = new Result[0];
162 }
163 count++;
164 return results;
165 }
166 });
167
168
169 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
170
171 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
172 createTrueThenFalseAnswer());
173
174
175 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
176 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
177
178 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
179
180 csrs.loadCache();
181
182 List<Result> results = csrs.cache;
183 Iterator<Result> iter = results.iterator();
184 assertEquals(3, results.size());
185 for (int i = 3; i >= 1 && iter.hasNext(); i--) {
186 Result result = iter.next();
187 byte[] row = result.getRow();
188 assertEquals("row" + i, new String(row, "UTF-8"));
189 assertEquals(1, result.getMap().size());
190 }
191 assertTrue(csrs.closed);
192 } finally {
193 csrs.close();
194 }
195 }
196
197 @Test
198 public void testNoContextFewerRecords() throws Exception {
199 final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
200 Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
201 Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
202 Type.Maximum);
203
204 @SuppressWarnings("unchecked")
205 RegionServerCallable<Result[]> callableWithReplicas = Mockito
206 .mock(RegionServerCallable.class);
207
208
209 scan.setCaching(2);
210
211 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
212
213 ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
214 TableName.valueOf("table"), clusterConn);
215
216 try {
217 csrs.setRpcRetryingCaller(caller);
218 csrs.setRpcControllerFactory(controllerFactory);
219 csrs.setScannerCallableFactory(factory);
220
221
222 Mockito.when(
223 caller.callWithRetries(callableWithReplicas,
224 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)).thenAnswer(
225 new Answer<Result[]>() {
226 int count = 0;
227
228 @Override
229 public Result[] answer(InvocationOnMock invocation) {
230 Result[] results;
231 if (0 == count) {
232 results = new Result[] {Result.create(new Cell[] {kv3}),
233 Result.create(new Cell[] {kv2})};
234 } else if (1 == count) {
235
236 results = new Result[] {Result.create(new Cell[] {kv1})};
237 } else {
238 throw new RuntimeException("Should not fetch a third batch from the server");
239 }
240 count++;
241 return results;
242 }
243 });
244
245
246 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
247
248 Mockito.when(callableWithReplicas.getServerHasMoreResults())
249 .thenThrow(new RuntimeException("Should not be called"));
250
251
252 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
253 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
254
255 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
256
257 csrs.loadCache();
258
259 List<Result> results = csrs.cache;
260 Iterator<Result> iter = results.iterator();
261 assertEquals(2, results.size());
262 for (int i = 3; i >= 2 && iter.hasNext(); i--) {
263 Result result = iter.next();
264 byte[] row = result.getRow();
265 assertEquals("row" + i, new String(row, "UTF-8"));
266 assertEquals(1, result.getMap().size());
267 }
268
269
270 results.clear();
271
272 csrs.loadCache();
273
274 assertEquals(1, results.size());
275 Result result = results.get(0);
276 assertEquals("row1", new String(result.getRow(), "UTF-8"));
277 assertEquals(1, result.getMap().size());
278
279 assertTrue(csrs.closed);
280 } finally {
281 csrs.close();
282 }
283 }
284
285 @Test
286 public void testNoContextNoRecords() throws Exception {
287 @SuppressWarnings("unchecked")
288 RegionServerCallable<Result[]> callableWithReplicas = Mockito
289 .mock(RegionServerCallable.class);
290
291
292 scan.setCaching(2);
293
294 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
295
296 ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
297 TableName.valueOf("table"), clusterConn);
298
299 try {
300 csrs.setRpcRetryingCaller(caller);
301 csrs.setRpcControllerFactory(controllerFactory);
302 csrs.setScannerCallableFactory(factory);
303
304
305 Mockito.when(
306 caller.callWithRetries(callableWithReplicas,
307 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)).thenReturn(new Result[0]);
308
309
310 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
311
312 Mockito.when(callableWithReplicas.getServerHasMoreResults())
313 .thenThrow(new RuntimeException("Should not be called"));
314
315
316 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
317 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
318
319 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
320
321 csrs.loadCache();
322
323 assertEquals(0, csrs.cache.size());
324 assertTrue(csrs.closed);
325 } finally {
326 csrs.close();
327 }
328 }
329
330 @Test
331 public void testContextNoRecords() throws Exception {
332 @SuppressWarnings("unchecked")
333 RegionServerCallable<Result[]> callableWithReplicas = Mockito
334 .mock(RegionServerCallable.class);
335
336 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
337
338 ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
339 TableName.valueOf("table"), clusterConn);
340
341 try {
342 csrs.setRpcRetryingCaller(caller);
343 csrs.setRpcControllerFactory(controllerFactory);
344 csrs.setScannerCallableFactory(factory);
345
346
347 Mockito.when(
348 caller.callWithRetries(callableWithReplicas,
349 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)).thenReturn(new Result[0]);
350
351
352 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
353
354 Mockito.when(callableWithReplicas.getServerHasMoreResults())
355 .thenReturn(false);
356
357
358 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
359 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
360
361 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
362
363 csrs.loadCache();
364
365 assertEquals(0, csrs.cache.size());
366 assertTrue(csrs.closed);
367 } finally {
368 csrs.close();
369 }
370 }
371 }