View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.KeyValue.Type;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
37  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
38  import org.apache.hadoop.hbase.testclassification.SmallTests;
39  import org.junit.After;
40  import org.junit.Before;
41  import org.junit.Test;
42  import org.junit.experimental.categories.Category;
43  import org.mockito.Mockito;
44  import org.mockito.invocation.InvocationOnMock;
45  import org.mockito.stubbing.Answer;
46  
47  /**
48   * Test the ClientSmallReversedScanner.
49   */
50  @Category(SmallTests.class)
51  public class TestClientSmallReversedScanner {
52  
53    Scan scan;
54    ExecutorService pool;
55    Configuration conf;
56  
57    HConnection clusterConn;
58    RpcRetryingCallerFactory rpcFactory;
59    RpcControllerFactory controllerFactory;
60    RpcRetryingCaller<Result[]> caller;
61  
62    @Before
63    @SuppressWarnings("unchecked")
64    public void setup() throws IOException {
65      clusterConn = Mockito.mock(HConnection.class);
66      rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
67      controllerFactory = Mockito.mock(RpcControllerFactory.class);
68      pool = Executors.newSingleThreadExecutor();
69      scan = new Scan();
70      conf = new Configuration();
71      Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
72      // Mock out the RpcCaller
73      caller = Mockito.mock(RpcRetryingCaller.class);
74      // Return the mock from the factory
75      Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
76    }
77  
78    @After
79    public void teardown() {
80      if (null != pool) {
81        pool.shutdownNow();
82      }
83    }
84  
85    /**
86     * Create a simple Answer which returns true the first time, and false every time after.
87     */
88    private Answer<Boolean> createTrueThenFalseAnswer() {
89      return new Answer<Boolean>() {
90        boolean first = true;
91  
92        @Override
93        public Boolean answer(InvocationOnMock invocation) {
94          if (first) {
95            first = false;
96            return true;
97          }
98          return false;
99        }
100     };
101   }
102 
103   private SmallScannerCallableFactory getFactory(
104       final RegionServerCallable<Result[]> callableWithReplicas) {
105     return new SmallScannerCallableFactory() {
106       @Override
107       public RegionServerCallable<Result[]> getCallable(final Scan sc, HConnection connection,
108           TableName table, byte[] localStartKey, final int cacheNum,
109           final RpcControllerFactory rpcControllerFactory) {
110         return callableWithReplicas;
111       }
112     };
113   }
114 
115   @Test
116   public void testContextPresent() throws Exception {
117     final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
118         Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
119         Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
120         Type.Maximum);
121 
122     @SuppressWarnings("unchecked")
123     RegionServerCallable<Result[]> callableWithReplicas = Mockito
124         .mock(RegionServerCallable.class);
125 
126     // Mock out the RpcCaller
127     @SuppressWarnings("unchecked")
128     RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
129     // Return the mock from the factory
130     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
131 
132     // Intentionally leave a "default" caching size in the Scan. No matter the value, we
133     // should continue based on the server context
134 
135     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
136 
137     ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
138         TableName.valueOf("table"), clusterConn);
139 
140     try {
141       csrs.setRpcRetryingCaller(caller);
142       csrs.setRpcControllerFactory(controllerFactory);
143       csrs.setScannerCallableFactory(factory);
144 
145       // Return some data the first time, less the second, and none after that
146       Mockito.when(
147           caller.callWithRetries(callableWithReplicas,
148               HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)).thenAnswer(
149           new Answer<Result[]>() {
150             int count = 0;
151 
152             @Override
153             public Result[] answer(InvocationOnMock invocation) {
154               Result[] results;
155               if (0 == count) {
156                 results = new Result[] {Result.create(new Cell[] {kv3}),
157                     Result.create(new Cell[] {kv2})};
158               } else if (1 == count) {
159                 results = new Result[] {Result.create(new Cell[] {kv1})};
160               } else {
161                 results = new Result[0];
162               }
163               count++;
164               return results;
165             }
166           });
167 
168       // Pass back the context always
169       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
170       // Only have more results the first time
171       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
172           createTrueThenFalseAnswer());
173 
174       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
175       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
176       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
177       // Trigger the "no more data" branch for #nextScanner(...)
178       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
179 
180       csrs.loadCache();
181 
182       List<Result> results = csrs.cache;
183       Iterator<Result> iter = results.iterator();
184       assertEquals(3, results.size());
185       for (int i = 3; i >= 1 && iter.hasNext(); i--) {
186         Result result = iter.next();
187         byte[] row = result.getRow();
188         assertEquals("row" + i, new String(row, "UTF-8"));
189         assertEquals(1, result.getMap().size());
190       }
191       assertTrue(csrs.closed);
192     } finally {
193       csrs.close();
194     }
195   }
196 
197   @Test
198   public void testNoContextFewerRecords() throws Exception {
199     final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
200         Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
201         Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
202         Type.Maximum);
203 
204     @SuppressWarnings("unchecked")
205     RegionServerCallable<Result[]> callableWithReplicas = Mockito
206         .mock(RegionServerCallable.class);
207 
208     // While the server returns 2 records per batch, we expect more records.
209     scan.setCaching(2);
210 
211     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
212 
213     ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
214         TableName.valueOf("table"), clusterConn);
215 
216     try {
217       csrs.setRpcRetryingCaller(caller);
218       csrs.setRpcControllerFactory(controllerFactory);
219       csrs.setScannerCallableFactory(factory);
220 
221       // Return some data the first time, less the second, and none after that
222       Mockito.when(
223           caller.callWithRetries(callableWithReplicas,
224               HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)).thenAnswer(
225           new Answer<Result[]>() {
226             int count = 0;
227 
228             @Override
229             public Result[] answer(InvocationOnMock invocation) {
230               Result[] results;
231               if (0 == count) {
232                 results = new Result[] {Result.create(new Cell[] {kv3}),
233                     Result.create(new Cell[] {kv2})};
234               } else if (1 == count) {
235                 // Return fewer records than expected (2)
236                 results = new Result[] {Result.create(new Cell[] {kv1})};
237               } else {
238                 throw new RuntimeException("Should not fetch a third batch from the server");
239               }
240               count++;
241               return results;
242             }
243           });
244 
245       // Server doesn't return the context
246       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
247       // getServerHasMoreResults shouldn't be called when hasMoreResultsContext returns false
248       Mockito.when(callableWithReplicas.getServerHasMoreResults())
249           .thenThrow(new RuntimeException("Should not be called"));
250 
251       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
252       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
253       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
254       // Trigger the "no more data" branch for #nextScanner(...)
255       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
256 
257       csrs.loadCache();
258 
259       List<Result> results = csrs.cache;
260       Iterator<Result> iter = results.iterator();
261       assertEquals(2, results.size());
262       for (int i = 3; i >= 2 && iter.hasNext(); i--) {
263         Result result = iter.next();
264         byte[] row = result.getRow();
265         assertEquals("row" + i, new String(row, "UTF-8"));
266         assertEquals(1, result.getMap().size());
267       }
268 
269       // "consume" the Results
270       results.clear();
271 
272       csrs.loadCache();
273 
274       assertEquals(1, results.size());
275       Result result = results.get(0);
276       assertEquals("row1", new String(result.getRow(), "UTF-8"));
277       assertEquals(1, result.getMap().size());
278 
279       assertTrue(csrs.closed);
280     } finally {
281       csrs.close();
282     }
283   }
284 
285   @Test
286   public void testNoContextNoRecords() throws Exception {
287     @SuppressWarnings("unchecked")
288     RegionServerCallable<Result[]> callableWithReplicas = Mockito
289         .mock(RegionServerCallable.class);
290 
291     // While the server return 2 records per RPC, we expect there to be more records.
292     scan.setCaching(2);
293 
294     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
295 
296     ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
297         TableName.valueOf("table"), clusterConn);
298 
299     try {
300       csrs.setRpcRetryingCaller(caller);
301       csrs.setRpcControllerFactory(controllerFactory);
302       csrs.setScannerCallableFactory(factory);
303 
304       // Return some data the first time, less the second, and none after that
305       Mockito.when(
306           caller.callWithRetries(callableWithReplicas,
307               HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)).thenReturn(new Result[0]);
308 
309       // Server doesn't return the context
310       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
311       // Only have more results the first time
312       Mockito.when(callableWithReplicas.getServerHasMoreResults())
313           .thenThrow(new RuntimeException("Should not be called"));
314 
315       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
316       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
317       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
318       // Trigger the "no more data" branch for #nextScanner(...)
319       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
320 
321       csrs.loadCache();
322 
323       assertEquals(0, csrs.cache.size());
324       assertTrue(csrs.closed);
325     } finally {
326       csrs.close();
327     }
328   }
329 
330   @Test
331   public void testContextNoRecords() throws Exception {
332     @SuppressWarnings("unchecked")
333     RegionServerCallable<Result[]> callableWithReplicas = Mockito
334         .mock(RegionServerCallable.class);
335 
336     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
337 
338     ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
339         TableName.valueOf("table"), clusterConn);
340 
341     try {
342       csrs.setRpcRetryingCaller(caller);
343       csrs.setRpcControllerFactory(controllerFactory);
344       csrs.setScannerCallableFactory(factory);
345 
346       // Return some data the first time, less the second, and none after that
347       Mockito.when(
348           caller.callWithRetries(callableWithReplicas,
349               HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)).thenReturn(new Result[0]);
350 
351       // Server doesn't return the context
352       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
353       // Only have more results the first time
354       Mockito.when(callableWithReplicas.getServerHasMoreResults())
355           .thenReturn(false);
356 
357       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
358       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
359       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
360       // Trigger the "no more data" branch for #nextScanner(...)
361       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
362 
363       csrs.loadCache();
364 
365       assertEquals(0, csrs.cache.size());
366       assertTrue(csrs.closed);
367     } finally {
368       csrs.close();
369     }
370   }
371 }