View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValueUtil;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.classification.InterfaceStability;
32  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
33  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
34  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35  import org.apache.hadoop.hbase.protobuf.RequestConverter;
36  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
39  import org.apache.hadoop.hbase.util.Bytes;
40  
41  import com.google.common.annotations.VisibleForTesting;
42  import com.google.protobuf.ServiceException;
43  
44  /**
45   * Client scanner for small scan. Generally, only one RPC is called to fetch the
46   * scan results, unless the results cross multiple regions or the row count of
47   * results excess the caching.
48   * 
49   * For small scan, it will get better performance than {@link ClientScanner}
50   */
51  @InterfaceAudience.Public
52  @InterfaceStability.Evolving
53  public class ClientSmallScanner extends ClientScanner {
54    private final Log LOG = LogFactory.getLog(this.getClass());
55    private RegionServerCallable<Result[]> smallScanCallable = null;
56    private SmallScannerCallableFactory callableFactory;
57  
58    /**
59     * Create a new ClientSmallScanner for the specified table. An HConnection
60     * will be retrieved using the passed Configuration. Note that the passed
61     * {@link Scan} 's start row maybe changed.
62     * 
63     * @param conf The {@link Configuration} to use.
64     * @param scan {@link Scan} to use in this scanner
65     * @param tableName The table that we wish to rangeGet
66     * @throws IOException
67     */
68    public ClientSmallScanner(final Configuration conf, final Scan scan,
69        final TableName tableName) throws IOException {
70      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
71    }
72  
73    /**
74     * Create a new ClientSmallScanner for the specified table. An HConnection
75     * will be retrieved using the passed Configuration. Note that the passed
76     * {@link Scan} 's start row maybe changed.
77     * @param conf
78     * @param scan
79     * @param tableName
80     * @param connection
81     * @throws IOException
82     */
83    public ClientSmallScanner(final Configuration conf, final Scan scan,
84        final TableName tableName, HConnection connection) throws IOException {
85      this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf,
86          connection.getStatisticsTracker()),
87        RpcControllerFactory.instantiate(conf));
88    }
89  
90    /**
91     * Create a new ShortClientScanner for the specified table Note that the
92     * passed {@link Scan}'s start row maybe changed changed.
93     * 
94     * @param conf The {@link Configuration} to use.
95     * @param scan {@link Scan} to use in this scanner
96     * @param tableName The table that we wish to rangeGet
97     * @param connection Connection identifying the cluster
98     * @param rpcFactory
99     * @param controllerFactory 
100    * @throws IOException
101    *           If the remote call fails
102    */
103   public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
104       HConnection connection, RpcRetryingCallerFactory rpcFactory,
105       RpcControllerFactory controllerFactory) throws IOException {
106     this(conf, scan, tableName, connection, rpcFactory, controllerFactory, new SmallScannerCallableFactory());
107   }
108 
109   /**
110    * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
111    * 's start row maybe changed changed. Intended for unit tests to provide their own
112    * {@link SmallScannerCallableFactory} implementation/mock.
113    *
114    * @param conf
115    *          The {@link Configuration} to use.
116    * @param scan
117    *          {@link Scan} to use in this scanner
118    * @param tableName
119    *          The table that we wish to rangeGet
120    * @param connection
121    *          Connection identifying the cluster
122    * @param rpcFactory
123    *          Factory used to create the {@link RpcRetryingCaller}
124    * @param controllerFactory
125    *          Factory used to access RPC payloads
126    * @param callableFactory
127    *          Factory used to create the callable for this scan
128    * @throws IOException
129    */
130   @VisibleForTesting
131   ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
132       HConnection connection, RpcRetryingCallerFactory rpcFactory,
133       RpcControllerFactory controllerFactory, SmallScannerCallableFactory callableFactory)
134       throws IOException {
135     super(conf, scan, tableName, connection, rpcFactory, controllerFactory);
136     this.callableFactory = callableFactory;
137   }
138 
139   @Override
140   protected void initializeScannerInConstruction() throws IOException {
141     // No need to initialize the scanner when constructing instance, do it when
142     // calling next(). Do nothing here.
143   }
144 
145   /**
146    * Gets a scanner for following scan. Move to next region or continue from the
147    * last result or start from the start row.
148    * @param nbRows
149    * @param done true if Server-side says we're done scanning.
150    * @param currentRegionDone true if scan is over on current region
151    * @return true if has next scanner
152    * @throws IOException
153    */
154   private boolean nextScanner(int nbRows, final boolean done,
155       boolean currentRegionDone) throws IOException {
156     // Where to start the next getter
157     byte[] localStartKey;
158     int cacheNum = nbRows;
159     boolean regionChanged = true;
160     // if we're at end of table, close and return false to stop iterating
161     if (this.currentRegion != null && currentRegionDone) {
162       byte[] endKey = this.currentRegion.getEndKey();
163       if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
164           || checkScanStopRow(endKey) || done) {
165         close();
166         if (LOG.isDebugEnabled()) {
167           LOG.debug("Finished with small scan at " + this.currentRegion);
168         }
169         return false;
170       }
171       localStartKey = endKey;
172       if (LOG.isDebugEnabled()) {
173         LOG.debug("Finished with region " + this.currentRegion);
174       }
175     } else if (this.lastResult != null) {
176       regionChanged = false;
177       localStartKey = Bytes.add(lastResult.getRow(), new byte[1]);
178     } else {
179       localStartKey = this.scan.getStartRow();
180     }
181 
182     if (LOG.isTraceEnabled()) {
183       LOG.trace("Advancing internal small scanner to startKey at '"
184           + Bytes.toStringBinary(localStartKey) + "'");
185     }
186     smallScanCallable = callableFactory.getCallable(
187         scan, getConnection(), getTable(), localStartKey, cacheNum, rpcControllerFactory);
188     if (this.scanMetrics != null && regionChanged) {
189       this.scanMetrics.countOfRegions.incrementAndGet();
190     }
191     return true;
192   }
193 
194   @Override
195   public Result next() throws IOException {
196     // If the scanner is closed and there's nothing left in the cache, next is a
197     // no-op.
198     if (cache.size() == 0 && this.closed) {
199       return null;
200     }
201     if (cache.size() == 0) {
202       loadCache();
203     }
204 
205     if (cache.size() > 0) {
206       return cache.poll();
207     }
208     // if we exhausted this scanner before calling close, write out the scan
209     // metrics
210     writeScanMetrics();
211     return null;
212   }
213 
214   @Override
215   protected void loadCache() throws IOException {
216     Result[] values = null;
217     long remainingResultSize = maxScannerResultSize;
218     int countdown = this.caching;
219     boolean currentRegionDone = false;
220     // Values == null means server-side filter has determined we must STOP
221     while (remainingResultSize > 0 && countdown > 0
222         && nextScanner(countdown, values == null, currentRegionDone)) {
223       // Server returns a null values if scanning is to stop. Else,
224       // returns an empty array if scanning is to go on and we've just
225       // exhausted current region.
226       values = this.caller.callWithRetries(smallScanCallable);
227       this.currentRegion = smallScanCallable.getHRegionInfo();
228       long currentTime = System.currentTimeMillis();
229       if (this.scanMetrics != null) {
230         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
231             - lastNext);
232       }
233       lastNext = currentTime;
234       if (values != null && values.length > 0) {
235         for (int i = 0; i < values.length; i++) {
236           Result rs = values[i];
237           cache.add(rs);
238           for (Cell kv : rs.rawCells()) {
239             remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
240           }
241           countdown--;
242           this.lastResult = rs;
243         }
244       }
245       if (smallScanCallable.hasMoreResultsContext()) {
246         // If the server has more results, the current region is not done
247         currentRegionDone = !smallScanCallable.getServerHasMoreResults();
248       } else {
249         // not guaranteed to get the context in older versions, fall back to checking countdown
250         currentRegionDone = countdown > 0;
251       }
252     }
253   }
254 
255   public void close() {
256     if (!scanMetricsPublished) writeScanMetrics();
257     closed = true;
258   }
259 
260   @VisibleForTesting
261   protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
262     this.callableFactory = callableFactory;
263   }
264 
265   @VisibleForTesting
266   protected void setRpcRetryingCaller(RpcRetryingCaller<Result []> caller) {
267     this.caller = caller;
268   }
269 
270   @VisibleForTesting
271   protected void setRpcControllerFactory(RpcControllerFactory rpcControllerFactory) {
272     this.rpcControllerFactory = rpcControllerFactory;
273   }
274 
275   @InterfaceAudience.Private
276   protected static class SmallScannerCallableFactory {
277 
278     public RegionServerCallable<Result[]> getCallable(final Scan sc, HConnection connection,
279         TableName table, byte[] localStartKey, final int cacheNum,
280         final RpcControllerFactory rpcControllerFactory) throws IOException {
281       sc.setStartRow(localStartKey);
282       RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
283           connection, table, sc.getStartRow()) {
284         public Result[] call() throws IOException {
285           ScanRequest request = RequestConverter.buildScanRequest(getLocation()
286             .getRegionInfo().getRegionName(), sc, cacheNum, true);
287           ScanResponse response = null;
288           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
289           try {
290             controller.setPriority(getTableName());
291             response = getStub().scan(controller, request);
292             if (response.hasMoreResultsInRegion()) {
293               setHasMoreResultsContext(true);
294               setServerHasMoreResults(response.getMoreResultsInRegion());
295             } else {
296               setHasMoreResultsContext(false);
297             }
298             return ResponseConverter.getResults(controller.cellScanner(),
299                 response);
300           } catch (ServiceException se) {
301             throw ProtobufUtil.getRemoteException(se);
302           }
303         }
304       };
305       return callable;
306     }
307 
308   }
309 }