1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.HRegionInfo;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.KeyValue.Type;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
36 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
37 import org.apache.hadoop.hbase.testclassification.SmallTests;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.junit.experimental.categories.Category;
42 import org.mockito.Mockito;
43 import org.mockito.invocation.InvocationOnMock;
44 import org.mockito.stubbing.Answer;
45
46
47
48
49 @Category(SmallTests.class)
50 public class TestClientSmallScanner {
51
52 Scan scan;
53 ExecutorService pool;
54 Configuration conf;
55
56 HConnection clusterConn;
57 RpcRetryingCallerFactory rpcFactory;
58 RpcControllerFactory controllerFactory;
59 RpcRetryingCaller<Result[]> caller;
60
61 @Before
62 @SuppressWarnings("unchecked")
63 public void setup() throws IOException {
64 clusterConn = Mockito.mock(HConnection.class);
65 rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
66 controllerFactory = Mockito.mock(RpcControllerFactory.class);
67 pool = Executors.newSingleThreadExecutor();
68 scan = new Scan();
69 conf = new Configuration();
70 Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
71
72 caller = Mockito.mock(RpcRetryingCaller.class);
73
74 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
75 }
76
77 @After
78 public void teardown() {
79 if (null != pool) {
80 pool.shutdownNow();
81 }
82 }
83
84
85
86
87 private Answer<Boolean> createTrueThenFalseAnswer() {
88 return new Answer<Boolean>() {
89 boolean first = true;
90
91 @Override
92 public Boolean answer(InvocationOnMock invocation) {
93 if (first) {
94 first = false;
95 return true;
96 }
97 return false;
98 }
99 };
100 }
101
102 private SmallScannerCallableFactory getFactory(
103 final RegionServerCallable<Result[]> callableWithReplicas) {
104 return new SmallScannerCallableFactory() {
105 @Override
106 public RegionServerCallable<Result[]> getCallable(final Scan sc, HConnection connection,
107 TableName table, byte[] localStartKey, final int cacheNum,
108 final RpcControllerFactory rpcControllerFactory) {
109 return callableWithReplicas;
110 }
111 };
112 }
113
114 @Test
115 public void testContextPresent() throws Exception {
116 final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
117 Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
118 Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
119 Type.Maximum);
120
121 @SuppressWarnings("unchecked")
122 RegionServerCallable<Result[]> callableWithReplicas = Mockito
123 .mock(RegionServerCallable.class);
124
125
126 @SuppressWarnings("unchecked")
127 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
128
129 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
130
131 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
132
133
134
135
136 ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
137 clusterConn);
138 try {
139 css.setRpcRetryingCaller(caller);
140 css.setRpcControllerFactory(controllerFactory);
141 css.setScannerCallableFactory(factory);
142
143
144 Mockito.when(caller.callWithRetries(callableWithReplicas)).thenAnswer(new Answer<Result[]>() {
145 int count = 0;
146
147 @Override
148 public Result[] answer(InvocationOnMock invocation) {
149 Result[] results;
150 if (0 == count) {
151 results = new Result[] {Result.create(new Cell[] {kv1}),
152 Result.create(new Cell[] {kv2})};
153 } else if (1 == count) {
154 results = new Result[] {Result.create(new Cell[] {kv3})};
155 } else {
156 results = new Result[0];
157 }
158 count++;
159 return results;
160 }
161 });
162
163
164 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
165
166 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
167 createTrueThenFalseAnswer());
168
169
170 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
171 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
172
173 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
174
175 css.loadCache();
176
177 List<Result> results = css.cache;
178 assertEquals(3, results.size());
179 for (int i = 1; i <= 3; i++) {
180 Result result = results.get(i - 1);
181 byte[] row = result.getRow();
182 assertEquals("row" + i, new String(row, "UTF-8"));
183 assertEquals(1, result.getMap().size());
184 }
185
186 assertTrue(css.closed);
187 } finally {
188 css.close();
189 }
190 }
191
192 @Test
193 public void testNoContextFewerRecords() throws Exception {
194 final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
195 Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
196 Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
197 Type.Maximum);
198
199 @SuppressWarnings("unchecked")
200 RegionServerCallable<Result[]> callableWithReplicas = Mockito
201 .mock(RegionServerCallable.class);
202
203
204 scan.setCaching(2);
205 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
206
207 ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
208 clusterConn);
209 try {
210 css.setRpcRetryingCaller(caller);
211 css.setRpcControllerFactory(controllerFactory);
212 css.setScannerCallableFactory(factory);
213
214 Mockito.when(caller.callWithRetries(callableWithReplicas)).thenAnswer(new Answer<Result[]>() {
215 int count = 0;
216
217 @Override
218 public Result[] answer(InvocationOnMock invocation) {
219 Result[] results;
220 if (0 == count) {
221 results = new Result[] {Result.create(new Cell[] {kv1}),
222 Result.create(new Cell[] {kv2})};
223 } else if (1 == count) {
224
225 results = new Result[] {Result.create(new Cell[] {kv3})};
226 } else {
227 throw new RuntimeException("Should not fetch a third batch from the server");
228 }
229 count++;
230 return results;
231 }
232 });
233
234
235 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
236
237 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
238 new RuntimeException("Should not be called"));
239
240
241 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
242 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
243
244 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
245
246 css.loadCache();
247
248 List<Result> results = css.cache;
249 assertEquals(2, results.size());
250 for (int i = 1; i <= 2; i++) {
251 Result result = results.get(i - 1);
252 byte[] row = result.getRow();
253 assertEquals("row" + i, new String(row, "UTF-8"));
254 assertEquals(1, result.getMap().size());
255 }
256
257
258 results.clear();
259
260 css.loadCache();
261
262 assertEquals(1, results.size());
263 Result result = results.get(0);
264 assertEquals("row3", new String(result.getRow(), "UTF-8"));
265 assertEquals(1, result.getMap().size());
266 assertTrue(css.closed);
267 } finally {
268 css.close();
269 }
270 }
271
272 @Test
273 public void testNoContextNoRecords() throws Exception {
274 @SuppressWarnings("unchecked")
275 RegionServerCallable<Result[]> callableWithReplicas = Mockito
276 .mock(RegionServerCallable.class);
277
278
279 scan.setCaching(2);
280
281 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
282
283 ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
284 clusterConn);
285 try {
286 css.setRpcRetryingCaller(caller);
287 css.setRpcControllerFactory(controllerFactory);
288 css.setScannerCallableFactory(factory);
289
290
291 Mockito.when(caller.callWithRetries(callableWithReplicas)).thenReturn(new Result[0]);
292
293
294 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
295
296 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
297 new RuntimeException("Should not be called"));
298
299
300 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
301 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
302
303 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
304
305 css.loadCache();
306
307 assertEquals(0, css.cache.size());
308 assertTrue(css.closed);
309 } finally {
310 css.close();
311 }
312 }
313
314 @Test
315 public void testContextNoRecords() throws Exception {
316 @SuppressWarnings("unchecked")
317 RegionServerCallable<Result[]> callableWithReplicas = Mockito
318 .mock(RegionServerCallable.class);
319
320 SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
321
322 ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
323 clusterConn);
324 try {
325 css.setRpcRetryingCaller(caller);
326 css.setRpcControllerFactory(controllerFactory);
327 css.setScannerCallableFactory(factory);
328
329
330 Mockito.when(caller.callWithRetries(callableWithReplicas)).thenReturn(new Result[0]);
331
332
333 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
334
335 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenReturn(false);
336
337
338 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
339 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
340
341 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
342
343 css.loadCache();
344
345 assertEquals(0, css.cache.size());
346 assertTrue(css.closed);
347 } finally {
348 css.close();
349 }
350 }
351 }