1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.util.LinkedList;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.DoNotRetryIOException;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.KeyValueUtil;
34 import org.apache.hadoop.hbase.NotServingRegionException;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.UnknownScannerException;
37 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
38 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
39 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
41 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
42 import org.apache.hadoop.hbase.util.Bytes;
43
44
45
46
47
48
49 @InterfaceAudience.Public
50 @InterfaceStability.Stable
51 public class ClientScanner extends AbstractClientScanner {
52 private final Log LOG = LogFactory.getLog(this.getClass());
53 protected Scan scan;
54 protected boolean closed = false;
55
56
57 protected HRegionInfo currentRegion = null;
58 protected ScannerCallable callable = null;
59 protected final LinkedList<Result> cache = new LinkedList<Result>();
60 protected final int caching;
61 protected long lastNext;
62
63 protected Result lastResult = null;
64 protected final long maxScannerResultSize;
65 private final HConnection connection;
66 private final TableName tableName;
67 protected final int scannerTimeout;
68 protected boolean scanMetricsPublished = false;
69 protected RpcRetryingCaller<Result []> caller;
70 protected RpcControllerFactory rpcControllerFactory;
71
72
73
74
75
76
77
78
79
80
81
82 @Deprecated
83 public ClientScanner(final Configuration conf, final Scan scan,
84 final TableName tableName) throws IOException {
85 this(conf, scan, tableName, HConnectionManager.getConnection(conf));
86 }
87
88
89
90
91 @Deprecated
92 public ClientScanner(final Configuration conf, final Scan scan,
93 final byte [] tableName) throws IOException {
94 this(conf, scan, TableName.valueOf(tableName));
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
109 HConnection connection) throws IOException {
110 this(conf, scan, tableName, connection,
111 RpcRetryingCallerFactory.instantiate(conf, connection.getStatisticsTracker()),
112 RpcControllerFactory.instantiate(conf));
113 }
114
115
116
117
118 @Deprecated
119 public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
120 HConnection connection) throws IOException {
121 this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
122 RpcControllerFactory.instantiate(conf));
123 }
124
125
126
127
128
129
130
131 @Deprecated
132 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
133 HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
134 this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
135 }
136
137
138
139
140
141
142
143
144
145
146 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
147 HConnection connection, RpcRetryingCallerFactory rpcFactory,
148 RpcControllerFactory controllerFactory) throws IOException {
149 if (LOG.isTraceEnabled()) {
150 LOG.trace("Scan table=" + tableName
151 + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
152 }
153 this.scan = scan;
154 this.tableName = tableName;
155 this.lastNext = System.currentTimeMillis();
156 this.connection = connection;
157 if (scan.getMaxResultSize() > 0) {
158 this.maxScannerResultSize = scan.getMaxResultSize();
159 } else {
160 this.maxScannerResultSize = conf.getLong(
161 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
162 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
163 }
164 this.scannerTimeout = HBaseConfiguration.getInt(conf,
165 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
166 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
167 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
168
169
170 initScanMetrics(scan);
171
172
173 if (this.scan.getCaching() > 0) {
174 this.caching = this.scan.getCaching();
175 } else {
176 this.caching = conf.getInt(
177 HConstants.HBASE_CLIENT_SCANNER_CACHING,
178 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
179 }
180
181 this.caller = rpcFactory.<Result[]> newCaller();
182 this.rpcControllerFactory = controllerFactory;
183
184 initializeScannerInConstruction();
185 }
186
187 protected void initializeScannerInConstruction() throws IOException{
188
189 nextScanner(this.caching, false);
190 }
191
192 protected HConnection getConnection() {
193 return this.connection;
194 }
195
196
197
198
199
200 @Deprecated
201 protected byte [] getTableName() {
202 return this.tableName.getName();
203 }
204
205 protected TableName getTable() {
206 return this.tableName;
207 }
208
209 protected Scan getScan() {
210 return scan;
211 }
212
213 protected long getTimestamp() {
214 return lastNext;
215 }
216
217
218 protected boolean checkScanStopRow(final byte [] endKey) {
219 if (this.scan.getStopRow().length > 0) {
220
221 byte [] stopRow = scan.getStopRow();
222 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
223 endKey, 0, endKey.length);
224 if (cmp <= 0) {
225
226
227 return true;
228 }
229 }
230 return false;
231 }
232
233
234
235
236
237
238
239
240
241
242 protected boolean nextScanner(int nbRows, final boolean done)
243 throws IOException {
244
245 if (this.callable != null) {
246 this.callable.setClose();
247 this.caller.callWithRetries(callable);
248 this.callable = null;
249 }
250
251
252 byte [] localStartKey;
253
254
255 if (this.currentRegion != null) {
256 byte [] endKey = this.currentRegion.getEndKey();
257 if (endKey == null ||
258 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
259 checkScanStopRow(endKey) ||
260 done) {
261 close();
262 if (LOG.isTraceEnabled()) {
263 LOG.trace("Finished " + this.currentRegion);
264 }
265 return false;
266 }
267 localStartKey = endKey;
268 if (LOG.isTraceEnabled()) {
269 LOG.trace("Finished " + this.currentRegion);
270 }
271 } else {
272 localStartKey = this.scan.getStartRow();
273 }
274
275 if (LOG.isDebugEnabled() && this.currentRegion != null) {
276
277 LOG.debug("Advancing internal scanner to startKey at '" +
278 Bytes.toStringBinary(localStartKey) + "'");
279 }
280 try {
281 callable = getScannerCallable(localStartKey, nbRows);
282
283
284 this.caller.callWithRetries(callable);
285 this.currentRegion = callable.getHRegionInfo();
286 if (this.scanMetrics != null) {
287 this.scanMetrics.countOfRegions.incrementAndGet();
288 }
289 } catch (IOException e) {
290 close();
291 throw e;
292 }
293 return true;
294 }
295
296 @InterfaceAudience.Private
297 protected ScannerCallable getScannerCallable(byte [] localStartKey,
298 int nbRows) {
299 scan.setStartRow(localStartKey);
300 ScannerCallable s = new ScannerCallable(getConnection(),
301 getTable(), scan, this.scanMetrics, rpcControllerFactory.newController());
302 s.setCaching(nbRows);
303 return s;
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317 protected void writeScanMetrics() {
318 if (this.scanMetrics == null || scanMetricsPublished) {
319 return;
320 }
321 MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
322 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
323 scanMetricsPublished = true;
324 }
325
326 @Override
327 public Result next() throws IOException {
328
329 if (cache.size() == 0 && this.closed) {
330 return null;
331 }
332 if (cache.size() == 0) {
333 loadCache();
334 }
335
336 if (cache.size() > 0) {
337 return cache.poll();
338 }
339
340
341 writeScanMetrics();
342 return null;
343 }
344
345
346
347
348 protected void loadCache() throws IOException {
349 Result [] values = null;
350 long remainingResultSize = maxScannerResultSize;
351 int countdown = this.caching;
352
353
354 callable.setCaching(this.caching);
355
356
357 boolean skipFirst = false;
358 boolean retryAfterOutOfOrderException = true;
359
360
361 boolean serverHasMoreResults = false;
362 do {
363 try {
364 if (skipFirst) {
365
366
367 callable.setCaching(1);
368 values = this.caller.callWithRetries(callable);
369 callable.setCaching(this.caching);
370 skipFirst = false;
371 }
372
373
374
375 values = this.caller.callWithRetries(callable);
376 if (skipFirst && values != null && values.length == 1) {
377 skipFirst = false;
378 values = this.caller.callWithRetries(callable);
379 }
380 retryAfterOutOfOrderException = true;
381 } catch (DoNotRetryIOException e) {
382
383
384 if (e instanceof UnknownScannerException) {
385 long timeout = lastNext + scannerTimeout;
386
387
388
389 if (timeout < System.currentTimeMillis()) {
390 long elapsed = System.currentTimeMillis() - lastNext;
391 ScannerTimeoutException ex = new ScannerTimeoutException(
392 elapsed + "ms passed since the last invocation, " +
393 "timeout is currently set to " + scannerTimeout);
394 ex.initCause(e);
395 throw ex;
396 }
397 } else {
398
399
400 Throwable cause = e.getCause();
401 if ((cause != null && cause instanceof NotServingRegionException) ||
402 (cause != null && cause instanceof RegionServerStoppedException) ||
403 e instanceof OutOfOrderScannerNextException) {
404
405
406
407 } else {
408 throw e;
409 }
410 }
411
412 if (this.lastResult != null) {
413
414
415
416
417
418
419 this.scan.setStartRow(this.lastResult.getRow());
420
421
422
423 skipFirst = true;
424 }
425 if (e instanceof OutOfOrderScannerNextException) {
426 if (retryAfterOutOfOrderException) {
427 retryAfterOutOfOrderException = false;
428 } else {
429
430 throw new DoNotRetryIOException("Failed after retry of " +
431 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
432 }
433 }
434
435 this.currentRegion = null;
436
437
438 callable = null;
439
440 continue;
441 }
442 long currentTime = System.currentTimeMillis();
443 if (this.scanMetrics != null ) {
444 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
445 }
446 lastNext = currentTime;
447 if (values != null && values.length > 0) {
448 for (Result rs : values) {
449 cache.add(rs);
450 for (Cell kv : rs.rawCells()) {
451
452 remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
453 }
454 countdown--;
455 this.lastResult = rs;
456 }
457 }
458
459
460
461
462 if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
463
464
465 serverHasMoreResults = callable.getServerHasMoreResults();
466 }
467
468
469
470
471 } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
472 && nextScanner(countdown, values == null));
473 }
474
475 @Override
476 public void close() {
477 if (!scanMetricsPublished) writeScanMetrics();
478 if (callable != null) {
479 callable.setClose();
480 try {
481 this.caller.callWithRetries(callable);
482 } catch (UnknownScannerException e) {
483
484
485
486 } catch (IOException e) {
487
488 LOG.warn("scanner failed to close. Exception follows: " + e);
489 }
490 callable = null;
491 }
492 closed = true;
493 }
494 }