View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.List;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.KeyValue.Type;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
36  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
37  import org.apache.hadoop.hbase.testclassification.SmallTests;
38  import org.junit.After;
39  import org.junit.Before;
40  import org.junit.Test;
41  import org.junit.experimental.categories.Category;
42  import org.mockito.Mockito;
43  import org.mockito.invocation.InvocationOnMock;
44  import org.mockito.stubbing.Answer;
45  
46  /**
47   * Test the ClientSmallScanner.
48   */
49  @Category(SmallTests.class)
50  public class TestClientSmallScanner {
51  
52    Scan scan;
53    ExecutorService pool;
54    Configuration conf;
55  
56    HConnection clusterConn;
57    RpcRetryingCallerFactory rpcFactory;
58    RpcControllerFactory controllerFactory;
59    RpcRetryingCaller<Result[]> caller;
60  
61    @Before
62    @SuppressWarnings("unchecked")
63    public void setup() throws IOException {
64      clusterConn = Mockito.mock(HConnection.class);
65      rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
66      controllerFactory = Mockito.mock(RpcControllerFactory.class);
67      pool = Executors.newSingleThreadExecutor();
68      scan = new Scan();
69      conf = new Configuration();
70      Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
71      // Mock out the RpcCaller
72      caller = Mockito.mock(RpcRetryingCaller.class);
73      // Return the mock from the factory
74      Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
75    }
76  
77    @After
78    public void teardown() {
79      if (null != pool) {
80        pool.shutdownNow();
81      }
82    }
83  
84    /**
85     * Create a simple Answer which returns true the first time, and false every time after.
86     */
87    private Answer<Boolean> createTrueThenFalseAnswer() {
88      return new Answer<Boolean>() {
89        boolean first = true;
90  
91        @Override
92        public Boolean answer(InvocationOnMock invocation) {
93          if (first) {
94            first = false;
95            return true;
96          }
97          return false;
98        }
99      };
100   }
101 
102   private SmallScannerCallableFactory getFactory(
103       final RegionServerCallable<Result[]> callableWithReplicas) {
104     return new SmallScannerCallableFactory() {
105       @Override
106       public RegionServerCallable<Result[]> getCallable(final Scan sc, HConnection connection,
107           TableName table, byte[] localStartKey, final int cacheNum,
108           final RpcControllerFactory rpcControllerFactory) {
109         return callableWithReplicas;
110       }
111     };
112   }
113 
114   @Test
115   public void testContextPresent() throws Exception {
116     final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
117         Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
118         Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
119         Type.Maximum);
120 
121     @SuppressWarnings("unchecked")
122     RegionServerCallable<Result[]> callableWithReplicas = Mockito
123         .mock(RegionServerCallable.class);
124 
125     // Mock out the RpcCaller
126     @SuppressWarnings("unchecked")
127     RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
128     // Return the mock from the factory
129     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
130 
131     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
132 
133     // Intentionally leave a "default" caching size in the Scan. No matter the value, we
134     // should continue based on the server context
135 
136     ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
137         clusterConn);
138     try {
139       css.setRpcRetryingCaller(caller);
140       css.setRpcControllerFactory(controllerFactory);
141       css.setScannerCallableFactory(factory);
142 
143       // Return some data the first time, less the second, and none after that
144       Mockito.when(caller.callWithRetries(callableWithReplicas)).thenAnswer(new Answer<Result[]>() {
145         int count = 0;
146 
147         @Override
148         public Result[] answer(InvocationOnMock invocation) {
149           Result[] results;
150           if (0 == count) {
151             results = new Result[] {Result.create(new Cell[] {kv1}),
152                 Result.create(new Cell[] {kv2})};
153           } else if (1 == count) {
154             results = new Result[] {Result.create(new Cell[] {kv3})};
155           } else {
156             results = new Result[0];
157           }
158           count++;
159           return results;
160         }
161       });
162 
163       // Pass back the context always
164       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
165       // Only have more results the first time
166       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
167           createTrueThenFalseAnswer());
168 
169       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
170       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
171       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
172       // Trigger the "no more data" branch for #nextScanner(...)
173       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
174 
175       css.loadCache();
176 
177       List<Result> results = css.cache;
178       assertEquals(3, results.size());
179       for (int i = 1; i <= 3; i++) {
180         Result result = results.get(i - 1);
181         byte[] row = result.getRow();
182         assertEquals("row" + i, new String(row, "UTF-8"));
183         assertEquals(1, result.getMap().size());
184       }
185 
186       assertTrue(css.closed);
187     } finally {
188       css.close();
189     }
190   }
191 
192   @Test
193   public void testNoContextFewerRecords() throws Exception {
194     final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
195         Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
196         Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
197         Type.Maximum);
198 
199     @SuppressWarnings("unchecked")
200     RegionServerCallable<Result[]> callableWithReplicas = Mockito
201         .mock(RegionServerCallable.class);
202 
203     // While the server returns 2 records per batch, we expect more records.
204     scan.setCaching(2);
205     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
206 
207     ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
208         clusterConn);
209     try {
210       css.setRpcRetryingCaller(caller);
211       css.setRpcControllerFactory(controllerFactory);
212       css.setScannerCallableFactory(factory);
213       // Return some data the first time, less the second, and none after that
214       Mockito.when(caller.callWithRetries(callableWithReplicas)).thenAnswer(new Answer<Result[]>() {
215         int count = 0;
216 
217         @Override
218         public Result[] answer(InvocationOnMock invocation) {
219           Result[] results;
220           if (0 == count) {
221             results = new Result[] {Result.create(new Cell[] {kv1}),
222                 Result.create(new Cell[] {kv2})};
223           } else if (1 == count) {
224             // Return fewer records than expected (2)
225             results = new Result[] {Result.create(new Cell[] {kv3})};
226           } else {
227             throw new RuntimeException("Should not fetch a third batch from the server");
228           }
229           count++;
230           return results;
231         }
232       });
233 
234       // Server doesn't return the context
235       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
236       // Only have more results the first time
237       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
238           new RuntimeException("Should not be called"));
239 
240       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
241       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
242       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
243       // Trigger the "no more data" branch for #nextScanner(...)
244       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
245 
246       css.loadCache();
247 
248       List<Result> results = css.cache;
249       assertEquals(2, results.size());
250       for (int i = 1; i <= 2; i++) {
251         Result result = results.get(i - 1);
252         byte[] row = result.getRow();
253         assertEquals("row" + i, new String(row, "UTF-8"));
254         assertEquals(1, result.getMap().size());
255       }
256 
257       // "consume" the results we verified
258       results.clear();
259 
260       css.loadCache();
261 
262       assertEquals(1, results.size());
263       Result result = results.get(0);
264       assertEquals("row3", new String(result.getRow(), "UTF-8"));
265       assertEquals(1, result.getMap().size());
266       assertTrue(css.closed);
267     } finally {
268       css.close();
269     }
270   }
271 
272   @Test
273   public void testNoContextNoRecords() throws Exception {
274     @SuppressWarnings("unchecked")
275     RegionServerCallable<Result[]> callableWithReplicas = Mockito
276         .mock(RegionServerCallable.class);
277 
278     // While the server return 2 records per RPC, we expect there to be more records.
279     scan.setCaching(2);
280 
281     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
282 
283     ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
284         clusterConn);
285     try {
286       css.setRpcRetryingCaller(caller);
287       css.setRpcControllerFactory(controllerFactory);
288       css.setScannerCallableFactory(factory);
289 
290       // Return some data the first time, less the second, and none after that
291       Mockito.when(caller.callWithRetries(callableWithReplicas)).thenReturn(new Result[0]);
292 
293       // Server doesn't return the context
294       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
295       // Only have more results the first time
296       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
297           new RuntimeException("Should not be called"));
298 
299       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
300       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
301       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
302       // Trigger the "no more data" branch for #nextScanner(...)
303       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
304 
305       css.loadCache();
306 
307       assertEquals(0, css.cache.size());
308       assertTrue(css.closed);
309     } finally {
310       css.close();
311     }
312   }
313 
314   @Test
315   public void testContextNoRecords() throws Exception {
316     @SuppressWarnings("unchecked")
317     RegionServerCallable<Result[]> callableWithReplicas = Mockito
318         .mock(RegionServerCallable.class);
319 
320     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
321 
322     ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
323         clusterConn);
324     try {
325       css.setRpcRetryingCaller(caller);
326       css.setRpcControllerFactory(controllerFactory);
327       css.setScannerCallableFactory(factory);
328 
329       // Return some data the first time, less the second, and none after that
330       Mockito.when(caller.callWithRetries(callableWithReplicas)).thenReturn(new Result[0]);
331 
332       // Server doesn't return the context
333       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
334       // Only have more results the first time
335       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenReturn(false);
336 
337       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
338       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
339       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
340       // Trigger the "no more data" branch for #nextScanner(...)
341       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
342 
343       css.loadCache();
344 
345       assertEquals(0, css.cache.size());
346       assertTrue(css.closed);
347     } finally {
348       css.close();
349     }
350   }
351 }