View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.util.LinkedList;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.DoNotRetryIOException;
30  import org.apache.hadoop.hbase.HBaseConfiguration;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.NotServingRegionException;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.UnknownScannerException;
37  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
38  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
39  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
41  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
42  import org.apache.hadoop.hbase.util.Bytes;
43  
44  /**
45   * Implements the scanner interface for the HBase client.
46   * If there are multiple regions in a table, this scanner will iterate
47   * through them all.
48   */
49  @InterfaceAudience.Public
50  @InterfaceStability.Stable
51  public class ClientScanner extends AbstractClientScanner {
52      private final Log LOG = LogFactory.getLog(this.getClass());
53      protected Scan scan;
54      protected boolean closed = false;
55      // Current region scanner is against.  Gets cleared if current region goes
56      // wonky: e.g. if it splits on us.
57      protected HRegionInfo currentRegion = null;
58      protected ScannerCallable callable = null;
59      protected final LinkedList<Result> cache = new LinkedList<Result>();
60      protected final int caching;
61      protected long lastNext;
62      // Keep lastResult returned successfully in case we have to reset scanner.
63      protected Result lastResult = null;
64      protected final long maxScannerResultSize;
65      private final HConnection connection;
66      private final TableName tableName;
67      protected final int scannerTimeout;
68      protected boolean scanMetricsPublished = false;
69      protected RpcRetryingCaller<Result []> caller;
70      protected RpcControllerFactory rpcControllerFactory;
71  
72      /**
73       * Create a new ClientScanner for the specified table. An HConnection will be
74       * retrieved using the passed Configuration.
75       * Note that the passed {@link Scan}'s start row maybe changed changed.
76       *
77       * @param conf The {@link Configuration} to use.
78       * @param scan {@link Scan} to use in this scanner
79       * @param tableName The table that we wish to scan
80       * @throws IOException
81       */
82      @Deprecated
83      public ClientScanner(final Configuration conf, final Scan scan,
84          final TableName tableName) throws IOException {
85        this(conf, scan, tableName, HConnectionManager.getConnection(conf));
86      }
87  
88      /**
89       * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
90       */
91      @Deprecated
92      public ClientScanner(final Configuration conf, final Scan scan,
93          final byte [] tableName) throws IOException {
94        this(conf, scan, TableName.valueOf(tableName));
95      }
96  
97  
98      /**
99       * Create a new ClientScanner for the specified table
100      * Note that the passed {@link Scan}'s start row maybe changed changed.
101      *
102      * @param conf The {@link Configuration} to use.
103      * @param scan {@link Scan} to use in this scanner
104      * @param tableName The table that we wish to scan
105      * @param connection Connection identifying the cluster
106      * @throws IOException
107      */
108   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
109       HConnection connection) throws IOException {
110     this(conf, scan, tableName, connection,
111       RpcRetryingCallerFactory.instantiate(conf, connection.getStatisticsTracker()),
112         RpcControllerFactory.instantiate(conf));
113   }
114 
115   /**
116    * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
117    */
118   @Deprecated
119   public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
120       HConnection connection) throws IOException {
121     this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
122         RpcControllerFactory.instantiate(conf));
123   }
124 
125   /**
126    * @deprecated Use
127    *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection,
128    *             RpcRetryingCallerFactory, RpcControllerFactory)}
129    *             instead
130    */
131   @Deprecated
132   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
133       HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
134     this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
135   }
136 
137   /**
138    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
139    * row maybe changed changed.
140    * @param conf The {@link Configuration} to use.
141    * @param scan {@link Scan} to use in this scanner
142    * @param tableName The table that we wish to scan
143    * @param connection Connection identifying the cluster
144    * @throws IOException
145    */
146   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
147       HConnection connection, RpcRetryingCallerFactory rpcFactory,
148       RpcControllerFactory controllerFactory) throws IOException {
149       if (LOG.isTraceEnabled()) {
150         LOG.trace("Scan table=" + tableName
151             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
152       }
153       this.scan = scan;
154       this.tableName = tableName;
155       this.lastNext = System.currentTimeMillis();
156       this.connection = connection;
157       if (scan.getMaxResultSize() > 0) {
158         this.maxScannerResultSize = scan.getMaxResultSize();
159       } else {
160         this.maxScannerResultSize = conf.getLong(
161           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
162           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
163       }
164       this.scannerTimeout = HBaseConfiguration.getInt(conf,
165         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
166         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
167         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
168 
169       // check if application wants to collect scan metrics
170       initScanMetrics(scan);
171 
172       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
173       if (this.scan.getCaching() > 0) {
174         this.caching = this.scan.getCaching();
175       } else {
176         this.caching = conf.getInt(
177             HConstants.HBASE_CLIENT_SCANNER_CACHING,
178             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
179       }
180 
181       this.caller = rpcFactory.<Result[]> newCaller();
182       this.rpcControllerFactory = controllerFactory;
183 
184       initializeScannerInConstruction();
185     }
186 
187     protected void initializeScannerInConstruction() throws IOException{
188       // initialize the scanner
189       nextScanner(this.caching, false);
190     }
191 
192     protected HConnection getConnection() {
193       return this.connection;
194     }
195 
196     /**
197      * @return Table name
198      * @deprecated Since 0.96.0; use {@link #getTable()}
199      */
200     @Deprecated
201     protected byte [] getTableName() {
202       return this.tableName.getName();
203     }
204 
205     protected TableName getTable() {
206       return this.tableName;
207     }
208 
209     protected Scan getScan() {
210       return scan;
211     }
212 
213     protected long getTimestamp() {
214       return lastNext;
215     }
216 
217     // returns true if the passed region endKey
218     protected boolean checkScanStopRow(final byte [] endKey) {
219       if (this.scan.getStopRow().length > 0) {
220         // there is a stop row, check to see if we are past it.
221         byte [] stopRow = scan.getStopRow();
222         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
223           endKey, 0, endKey.length);
224         if (cmp <= 0) {
225           // stopRow <= endKey (endKey is equals to or larger than stopRow)
226           // This is a stop.
227           return true;
228         }
229       }
230       return false; //unlikely.
231     }
232 
233     /*
234      * Gets a scanner for the next region.  If this.currentRegion != null, then
235      * we will move to the endrow of this.currentRegion.  Else we will get
236      * scanner at the scan.getStartRow().  We will go no further, just tidy
237      * up outstanding scanners, if <code>currentRegion != null</code> and
238      * <code>done</code> is true.
239      * @param nbRows
240      * @param done Server-side says we're done scanning.
241      */
242   protected boolean nextScanner(int nbRows, final boolean done)
243     throws IOException {
244       // Close the previous scanner if it's open
245       if (this.callable != null) {
246         this.callable.setClose();
247         this.caller.callWithRetries(callable);
248         this.callable = null;
249       }
250 
251       // Where to start the next scanner
252       byte [] localStartKey;
253 
254       // if we're at end of table, close and return false to stop iterating
255       if (this.currentRegion != null) {
256         byte [] endKey = this.currentRegion.getEndKey();
257         if (endKey == null ||
258             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
259             checkScanStopRow(endKey) ||
260             done) {
261           close();
262           if (LOG.isTraceEnabled()) {
263             LOG.trace("Finished " + this.currentRegion);
264           }
265           return false;
266         }
267         localStartKey = endKey;
268         if (LOG.isTraceEnabled()) {
269           LOG.trace("Finished " + this.currentRegion);
270         }
271       } else {
272         localStartKey = this.scan.getStartRow();
273       }
274 
275       if (LOG.isDebugEnabled() && this.currentRegion != null) {
276         // Only worth logging if NOT first region in scan.
277         LOG.debug("Advancing internal scanner to startKey at '" +
278           Bytes.toStringBinary(localStartKey) + "'");
279       }
280       try {
281         callable = getScannerCallable(localStartKey, nbRows);
282         // Open a scanner on the region server starting at the
283         // beginning of the region
284         this.caller.callWithRetries(callable);
285         this.currentRegion = callable.getHRegionInfo();
286         if (this.scanMetrics != null) {
287           this.scanMetrics.countOfRegions.incrementAndGet();
288         }
289       } catch (IOException e) {
290         close();
291         throw e;
292       }
293       return true;
294     }
295 
296     @InterfaceAudience.Private
297     protected ScannerCallable getScannerCallable(byte [] localStartKey,
298         int nbRows) {
299       scan.setStartRow(localStartKey);
300       ScannerCallable s = new ScannerCallable(getConnection(),
301         getTable(), scan, this.scanMetrics, rpcControllerFactory.newController());
302       s.setCaching(nbRows);
303       return s;
304     }
305 
306     /**
307      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
308      * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
309      * framework because it doesn't support multi-instances of the same metrics on the same machine;
310      * for scan/map reduce scenarios, we will have multiple scans running at the same time.
311      *
312      * By default, scan metrics are disabled; if the application wants to collect them, this behavior
313      * can be turned on by calling calling:
314      *
315      * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
316      */
317     protected void writeScanMetrics() {
318       if (this.scanMetrics == null || scanMetricsPublished) {
319         return;
320       }
321       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
322       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
323       scanMetricsPublished = true;
324     }
325 
326     @Override
327     public Result next() throws IOException {
328       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
329       if (cache.size() == 0 && this.closed) {
330         return null;
331       }
332       if (cache.size() == 0) {
333         loadCache();
334       }
335 
336       if (cache.size() > 0) {
337         return cache.poll();
338       }
339 
340       // if we exhausted this scanner before calling close, write out the scan metrics
341       writeScanMetrics();
342       return null;
343     }
344 
345   /**
346    * Contact the servers to load more {@link Result}s in the cache.
347    */
348   protected void loadCache() throws IOException {
349     Result [] values = null;
350     long remainingResultSize = maxScannerResultSize;
351     int countdown = this.caching;
352     // We need to reset it if it's a new callable that was created
353     // with a countdown in nextScanner
354     callable.setCaching(this.caching);
355     // This flag is set when we want to skip the result returned.  We do
356     // this when we reset scanner because it split under us.
357     boolean skipFirst = false;
358     boolean retryAfterOutOfOrderException  = true;
359     // We don't expect that the server will have more results for us if
360     // it doesn't tell us otherwise. We rely on the size or count of results
361     boolean serverHasMoreResults = false;
362     do {
363       try {
364         if (skipFirst) {
365           // Skip only the first row (which was the last row of the last
366           // already-processed batch).
367           callable.setCaching(1);
368           values = this.caller.callWithRetries(callable);
369           callable.setCaching(this.caching);
370           skipFirst = false;
371         }
372         // Server returns a null values if scanning is to stop.  Else,
373         // returns an empty array if scanning is to go on and we've just
374         // exhausted current region.
375         values = this.caller.callWithRetries(callable);
376         if (skipFirst && values != null && values.length == 1) {
377           skipFirst = false; // Already skipped, unset it before scanning again
378           values = this.caller.callWithRetries(callable);
379         }
380         retryAfterOutOfOrderException  = true;
381       } catch (DoNotRetryIOException e) {
382         // DNRIOEs are thrown to make us break out of retries.  Some types of DNRIOEs want us
383         // to reset the scanner and come back in again.
384         if (e instanceof UnknownScannerException) {
385           long timeout = lastNext + scannerTimeout;
386           // If we are over the timeout, throw this exception to the client wrapped in
387           // a ScannerTimeoutException. Else, it's because the region moved and we used the old
388           // id against the new region server; reset the scanner.
389           if (timeout < System.currentTimeMillis()) {
390             long elapsed = System.currentTimeMillis() - lastNext;
391             ScannerTimeoutException ex = new ScannerTimeoutException(
392                 elapsed + "ms passed since the last invocation, " +
393                     "timeout is currently set to " + scannerTimeout);
394             ex.initCause(e);
395             throw ex;
396           }
397         } else {
398           // If exception is any but the list below throw it back to the client; else setup
399           // the scanner and retry.
400           Throwable cause = e.getCause();
401           if ((cause != null && cause instanceof NotServingRegionException) ||
402             (cause != null && cause instanceof RegionServerStoppedException) ||
403             e instanceof OutOfOrderScannerNextException) {
404             // Pass
405             // It is easier writing the if loop test as list of what is allowed rather than
406             // as a list of what is not allowed... so if in here, it means we do not throw.
407           } else {
408             throw e;
409           }
410         }
411         // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
412         if (this.lastResult != null) {
413           // The region has moved. We need to open a brand new scanner at
414           // the new location.
415           // Reset the startRow to the row we've seen last so that the new
416           // scanner starts at the correct row. Otherwise we may see previously
417           // returned rows again.
418           // (ScannerCallable by now has "relocated" the correct region)
419           this.scan.setStartRow(this.lastResult.getRow());
420 
421           // Skip first row returned.  We already let it out on previous
422           // invocation.
423           skipFirst = true;
424         }
425         if (e instanceof OutOfOrderScannerNextException) {
426           if (retryAfterOutOfOrderException) {
427             retryAfterOutOfOrderException = false;
428           } else {
429             // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
430             throw new DoNotRetryIOException("Failed after retry of " +
431               "OutOfOrderScannerNextException: was there a rpc timeout?", e);
432           }
433         }
434         // Clear region.
435         this.currentRegion = null;
436         // Set this to zero so we don't try and do an rpc and close on remote server when
437         // the exception we got was UnknownScanner or the Server is going down.
438         callable = null;
439         // This continue will take us to while at end of loop where we will set up new scanner.
440         continue;
441       }
442       long currentTime = System.currentTimeMillis();
443       if (this.scanMetrics != null ) {
444         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
445       }
446       lastNext = currentTime;
447       if (values != null && values.length > 0) {
448         for (Result rs : values) {
449           cache.add(rs);
450           for (Cell kv : rs.rawCells()) {
451             // TODO make method in Cell or CellUtil
452             remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
453           }
454           countdown--;
455           this.lastResult = rs;
456         }
457       }
458       // We expect that the server won't have more results for us when we exhaust
459       // the size (bytes or count) of the results returned. If the server *does* inform us that
460       // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
461       // get results is the moreResults context valid.
462       if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
463         // Only adhere to more server results when we don't have any partialResults
464         // as it keeps the outer loop logic the same.
465         serverHasMoreResults = callable.getServerHasMoreResults();
466       }
467       // Values == null means server-side filter has determined we must STOP
468       // !partialResults.isEmpty() means that we are still accumulating partial Results for a
469       // row. We should not change scanners before we receive all the partial Results for that
470       // row.
471     } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
472         && nextScanner(countdown, values == null));
473   }
474 
475     @Override
476     public void close() {
477       if (!scanMetricsPublished) writeScanMetrics();
478       if (callable != null) {
479         callable.setClose();
480         try {
481           this.caller.callWithRetries(callable);
482         } catch (UnknownScannerException e) {
483            // We used to catch this error, interpret, and rethrow. However, we
484            // have since decided that it's not nice for a scanner's close to
485            // throw exceptions. Chances are it was just due to lease time out.
486         } catch (IOException e) {
487            /* An exception other than UnknownScanner is unexpected. */
488            LOG.warn("scanner failed to close. Exception follows: " + e);
489         }
490         callable = null;
491       }
492       closed = true;
493     }
494 }