View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.CountDownLatch;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.DoNotRetryIOException;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.KeyValue.KVComparator;
39  import org.apache.hadoop.hbase.KeyValueUtil;
40  import org.apache.hadoop.hbase.client.IsolationLevel;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.executor.ExecutorService;
43  import org.apache.hadoop.hbase.filter.Filter;
44  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
45  import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  
49  /**
50   * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
51   * into List<KeyValue> for a single row.
52   */
53  @InterfaceAudience.Private
54  public class StoreScanner extends NonReversedNonLazyKeyValueScanner
55      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
56    static final Log LOG = LogFactory.getLog(StoreScanner.class);
57    protected Store store;
58    protected ScanQueryMatcher matcher;
59    protected KeyValueHeap heap;
60    protected boolean cacheBlocks;
61  
62    protected int countPerRow = 0;
63    protected int storeLimit = -1;
64    protected int storeOffset = 0;
65  
66    // Used to indicate that the scanner has closed (see HBASE-1107)
67    // Doesnt need to be volatile because it's always accessed via synchronized methods
68    protected boolean closing = false;
69    protected final boolean isGet;
70    protected final boolean explicitColumnQuery;
71    protected final boolean useRowColBloom;
72    /**
73     * A flag that enables StoreFileScanner parallel-seeking
74     */
75    protected boolean isParallelSeekEnabled = false;
76    protected ExecutorService executor;
77    protected final Scan scan;
78    protected final NavigableSet<byte[]> columns;
79    protected final long oldestUnexpiredTS;
80    protected final long now;
81    protected final int minVersions;
82  
83    /**
84     * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
85     * KVs skipped via seeking to next row/column. TODO: estimate them?
86     */
87    private long kvsScanned = 0;
88    private KeyValue prevKV = null;
89  
90    /** We don't ever expect to change this, the constant is just for clarity. */
91    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
92    public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
93        "hbase.storescanner.parallel.seek.enable";
94  
95    /** Used during unit testing to ensure that lazy seek does save seek ops */
96    protected static boolean lazySeekEnabledGlobally =
97        LAZY_SEEK_ENABLED_BY_DEFAULT;
98  
99    // if heap == null and lastTop != null, you need to reseek given the key below
100   protected KeyValue lastTop = null;
101 
102   // A flag whether use pread for scan
103   private boolean scanUsePread = false;
104   protected ReentrantLock lock = new ReentrantLock();
105   
106   private final long readPt;
107 
108   // used by the injection framework to test race between StoreScanner construction and compaction
109   enum StoreScannerCompactionRace {
110     BEFORE_SEEK,
111     AFTER_SEEK,
112     COMPACT_COMPLETE
113   }
114   
115   /** An internal constructor. */
116   protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
117       final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
118     this.readPt = readPt;
119     this.store = store;
120     this.cacheBlocks = cacheBlocks;
121     isGet = scan.isGetScan();
122     int numCol = columns == null ? 0 : columns.size();
123     explicitColumnQuery = numCol > 0;
124     this.scan = scan;
125     this.columns = columns;
126     this.now = EnvironmentEdgeManager.currentTimeMillis();
127     this.oldestUnexpiredTS = now - ttl;
128     this.minVersions = minVersions;
129 
130     if (store != null && ((HStore)store).getHRegion() != null
131         && ((HStore)store).getHRegion().getBaseConf() != null) {
132       Configuration conf = ((HStore) store).getHRegion().getBaseConf();
133       this.scanUsePread = conf.getBoolean("hbase.storescanner.use.pread", scan.isSmall());
134     } else {
135       this.scanUsePread = scan.isSmall();
136     }
137 
138     // We look up row-column Bloom filters for multi-column queries as part of
139     // the seek operation. However, we also look the row-column Bloom filter
140     // for multi-row (non-"get") scans because this is not done in
141     // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
142     useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
143 
144     // The parallel-seeking is on :
145     // 1) the config value is *true*
146     // 2) store has more than one store file
147     if (store != null && ((HStore)store).getHRegion() != null
148         && store.getStorefilesCount() > 1) {
149       RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
150       if (rsService == null || !rsService.getConfiguration().getBoolean(
151             STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
152       isParallelSeekEnabled = true;
153       executor = rsService.getExecutorService();
154     }
155   }
156 
157   /**
158    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
159    * are not in a compaction.
160    *
161    * @param store who we scan
162    * @param scan the spec
163    * @param columns which columns we are scanning
164    * @throws IOException
165    */
166   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
167       long readPt)
168                               throws IOException {
169     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
170         scanInfo.getMinVersions(), readPt);
171     if (columns != null && scan.isRaw()) {
172       throw new DoNotRetryIOException(
173           "Cannot specify any column for a raw scan");
174     }
175     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
176         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
177         oldestUnexpiredTS, now, store.getCoprocessorHost());
178 
179     this.store.addChangedReaderObserver(this);
180 
181     // Pass columns to try to filter out unnecessary StoreFiles.
182     List<KeyValueScanner> scanners = getScannersNoCompaction();
183 
184     // Seek all scanners to the start of the Row (or if the exact matching row
185     // key does not exist, then to the start of the next matching Row).
186     // Always check bloom filter to optimize the top row seek for delete
187     // family marker.
188     seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
189         && lazySeekEnabledGlobally, isParallelSeekEnabled);
190 
191     // set storeLimit
192     this.storeLimit = scan.getMaxResultsPerColumnFamily();
193 
194     // set rowOffset
195     this.storeOffset = scan.getRowOffsetPerColumnFamily();
196 
197     // Combine all seeked scanners with a heap
198     resetKVHeap(scanners, store.getComparator());
199   }
200 
201   /**
202    * Used for compactions.<p>
203    *
204    * Opens a scanner across specified StoreFiles.
205    * @param store who we scan
206    * @param scan the spec
207    * @param scanners ancillary scanners
208    * @param smallestReadPoint the readPoint that we should use for tracking
209    *          versions
210    */
211   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
212       List<? extends KeyValueScanner> scanners, ScanType scanType,
213       long smallestReadPoint, long earliestPutTs) throws IOException {
214     this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
215   }
216 
217   /**
218    * Used for compactions that drop deletes from a limited range of rows.<p>
219    *
220    * Opens a scanner across specified StoreFiles.
221    * @param store who we scan
222    * @param scan the spec
223    * @param scanners ancillary scanners
224    * @param smallestReadPoint the readPoint that we should use for tracking versions
225    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
226    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
227    */
228   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
229       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
230       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
231     this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
232         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
233   }
234 
235   private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
236       List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
237       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
238     this(store, false, scan, null, scanInfo.getTtl(), scanInfo.getMinVersions(),
239         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
240     if (dropDeletesFromRow == null) {
241       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
242           earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
243     } else {
244       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
245           oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
246     }
247 
248     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
249     scanners = selectScannersFrom(scanners);
250 
251     // Seek all scanners to the initial key
252     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
253 
254     // Combine all seeked scanners with a heap
255     resetKVHeap(scanners, store.getComparator());
256   }
257 
258   /** Constructor for testing. */
259   StoreScanner(final Scan scan, ScanInfo scanInfo,
260       ScanType scanType, final NavigableSet<byte[]> columns,
261       final List<KeyValueScanner> scanners) throws IOException {
262     this(scan, scanInfo, scanType, columns, scanners,
263         HConstants.LATEST_TIMESTAMP,
264         // 0 is passed as readpoint because the test bypasses Store
265         0);
266   }
267 
268   // Constructor for testing.
269   StoreScanner(final Scan scan, ScanInfo scanInfo,
270     ScanType scanType, final NavigableSet<byte[]> columns,
271     final List<KeyValueScanner> scanners, long earliestPutTs)
272         throws IOException {
273     this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
274       // 0 is passed as readpoint because the test bypasses Store
275       0);
276   }
277   
278   private StoreScanner(final Scan scan, ScanInfo scanInfo,
279       ScanType scanType, final NavigableSet<byte[]> columns,
280       final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
281           throws IOException {
282     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
283         scanInfo.getMinVersions(), readPt);
284     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
285         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
286 
287     // In unit tests, the store could be null
288     if (this.store != null) {
289       this.store.addChangedReaderObserver(this);
290     }
291     // Seek all scanners to the initial key
292     seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
293     resetKVHeap(scanners, scanInfo.getComparator());
294   }
295 
296   /**
297    * Get a filtered list of scanners. Assumes we are not in a compaction.
298    * @return list of scanners to seek
299    */
300   protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
301     final boolean isCompaction = false;
302     boolean usePread = isGet || scanUsePread;
303     return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
304         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
305   }
306 
307   /**
308    * Seek the specified scanners with the given key
309    * @param scanners
310    * @param seekKey
311    * @param isLazy true if using lazy seek
312    * @param isParallelSeek true if using parallel seek
313    * @throws IOException
314    */
315   protected void seekScanners(List<? extends KeyValueScanner> scanners,
316       KeyValue seekKey, boolean isLazy, boolean isParallelSeek)
317       throws IOException {
318     // Seek all scanners to the start of the Row (or if the exact matching row
319     // key does not exist, then to the start of the next matching Row).
320     // Always check bloom filter to optimize the top row seek for delete
321     // family marker.
322     if (isLazy) {
323       for (KeyValueScanner scanner : scanners) {
324         scanner.requestSeek(seekKey, false, true);
325       }
326     } else {
327       if (!isParallelSeek) {
328         for (KeyValueScanner scanner : scanners) {
329           scanner.seek(seekKey);
330         }
331       } else {
332         parallelSeek(scanners, seekKey);
333       }
334     }
335   }
336 
337   protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
338       KVComparator comparator) throws IOException {
339     // Combine all seeked scanners with a heap
340     heap = new KeyValueHeap(scanners, comparator);
341   }
342 
343   /**
344    * Filters the given list of scanners using Bloom filter, time range, and
345    * TTL.
346    */
347   protected List<KeyValueScanner> selectScannersFrom(
348       final List<? extends KeyValueScanner> allScanners) {
349     boolean memOnly;
350     boolean filesOnly;
351     if (scan instanceof InternalScan) {
352       InternalScan iscan = (InternalScan)scan;
353       memOnly = iscan.isCheckOnlyMemStore();
354       filesOnly = iscan.isCheckOnlyStoreFiles();
355     } else {
356       memOnly = false;
357       filesOnly = false;
358     }
359 
360     List<KeyValueScanner> scanners =
361         new ArrayList<KeyValueScanner>(allScanners.size());
362 
363     // We can only exclude store files based on TTL if minVersions is set to 0.
364     // Otherwise, we might have to return KVs that have technically expired.
365     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
366         Long.MIN_VALUE;
367 
368     // include only those scan files which pass all filters
369     for (KeyValueScanner kvs : allScanners) {
370       boolean isFile = kvs.isFileScanner();
371       if ((!isFile && filesOnly) || (isFile && memOnly)) {
372         continue;
373       }
374 
375       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
376         scanners.add(kvs);
377       }
378     }
379     return scanners;
380   }
381 
382   @Override
383   public KeyValue peek() {
384     lock.lock();
385     try {
386     if (this.heap == null) {
387       return this.lastTop;
388     }
389     return this.heap.peek();
390     } finally {
391       lock.unlock();
392     }
393   }
394 
395   @Override
396   public KeyValue next() {
397     // throw runtime exception perhaps?
398     throw new RuntimeException("Never call StoreScanner.next()");
399   }
400 
401   @Override
402   public void close() {
403     lock.lock();
404     try {
405     if (this.closing) return;
406     this.closing = true;
407     // under test, we dont have a this.store
408     if (this.store != null)
409       this.store.deleteChangedReaderObserver(this);
410     if (this.heap != null)
411       this.heap.close();
412     this.heap = null; // CLOSED!
413     this.lastTop = null; // If both are null, we are closed.
414     } finally {
415       lock.unlock();
416     }
417   }
418 
419   @Override
420   public boolean seek(KeyValue key) throws IOException {
421     lock.lock();
422     try {
423     // reset matcher state, in case that underlying store changed
424     checkReseek();
425     return this.heap.seek(key);
426     } finally {
427       lock.unlock();
428     }
429   }
430 
431   /**
432    * Get the next row of values from this Store.
433    * @param outResult
434    * @param limit
435    * @return true if there are more rows, false if scanner is done
436    */
437   @Override
438   public boolean next(List<Cell> outResult, int limit) throws IOException {
439     lock.lock();
440     try {
441     if (checkReseek()) {
442       return true;
443     }
444 
445     // if the heap was left null, then the scanners had previously run out anyways, close and
446     // return.
447     if (this.heap == null) {
448       close();
449       return false;
450     }
451 
452     KeyValue peeked = this.heap.peek();
453     if (peeked == null) {
454       close();
455       return false;
456     }
457 
458     // only call setRow if the row changes; avoids confusing the query matcher
459     // if scanning intra-row
460     byte[] row = peeked.getBuffer();
461     int offset = peeked.getRowOffset();
462     short length = peeked.getRowLength();
463     if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
464         matcher.rowOffset, matcher.rowLength)) {
465       this.countPerRow = 0;
466       matcher.setRow(row, offset, length);
467     }
468 
469     KeyValue kv;
470 
471     // Only do a sanity-check if store and comparator are available.
472     KeyValue.KVComparator comparator =
473         store != null ? store.getComparator() : null;
474 
475     int count = 0;
476     LOOP: while((kv = this.heap.peek()) != null) {
477       if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
478       checkScanOrder(prevKV, kv, comparator);
479       prevKV = kv;
480 
481       ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
482       qcode = optimize(qcode, kv);
483       switch(qcode) {
484         case INCLUDE:
485         case INCLUDE_AND_SEEK_NEXT_ROW:
486         case INCLUDE_AND_SEEK_NEXT_COL:
487 
488           Filter f = matcher.getFilter();
489           if (f != null) {
490             // TODO convert Scan Query Matcher to be Cell instead of KV based ?
491             kv = KeyValueUtil.ensureKeyValue(f.transformCell(kv));
492           }
493 
494           this.countPerRow++;
495           if (storeLimit > -1 &&
496               this.countPerRow > (storeLimit + storeOffset)) {
497             // do what SEEK_NEXT_ROW does.
498             if (!matcher.moreRowsMayExistAfter(kv)) {
499               return false;
500             }
501             seekToNextRow(kv);
502             break LOOP;
503           }
504 
505           // add to results only if we have skipped #storeOffset kvs
506           // also update metric accordingly
507           if (this.countPerRow > storeOffset) {
508             outResult.add(kv);
509             count++;
510           }
511 
512           if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
513             if (!matcher.moreRowsMayExistAfter(kv)) {
514               return false;
515             }
516             seekToNextRow(kv);
517           } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
518             seekAsDirection(matcher.getKeyForNextColumn(kv));
519           } else {
520             this.heap.next();
521           }
522 
523           if (limit > 0 && (count == limit)) {
524             break LOOP;
525           }
526           continue;
527 
528         case DONE:
529           return true;
530 
531         case DONE_SCAN:
532           close();
533           return false;
534 
535         case SEEK_NEXT_ROW:
536           // This is just a relatively simple end of scan fix, to short-cut end
537           // us if there is an endKey in the scan.
538           if (!matcher.moreRowsMayExistAfter(kv)) {
539             return false;
540           }
541 
542           seekToNextRow(kv);
543           break;
544 
545         case SEEK_NEXT_COL:
546           seekAsDirection(matcher.getKeyForNextColumn(kv));
547           break;
548 
549         case SKIP:
550           this.heap.next();
551           break;
552 
553         case SEEK_NEXT_USING_HINT:
554           // TODO convert resee to Cell?
555           KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv));
556           if (nextKV != null) {
557             seekAsDirection(nextKV);
558           } else {
559             heap.next();
560           }
561           break;
562 
563         default:
564           throw new RuntimeException("UNEXPECTED");
565       }
566     }
567 
568     if (count > 0) {
569       return true;
570     }
571 
572     // No more keys
573     close();
574     return false;
575     } finally {
576       lock.unlock();
577     }
578   }
579 
580   /*
581    * See if we should actually SEEK or rather just SKIP to the next Cell.
582    * (See HBASE-13109)
583    */
584   private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
585     byte[] nextIndexedKey = getNextIndexedKey();
586     if (nextIndexedKey == null || nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY ||
587         store == null) {
588       return qcode;
589     }
590     switch(qcode) {
591     case INCLUDE_AND_SEEK_NEXT_COL:
592     case SEEK_NEXT_COL:
593     {
594       if (matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
595         return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
596       }
597       break;
598     }
599     case INCLUDE_AND_SEEK_NEXT_ROW:
600     case SEEK_NEXT_ROW:
601     {
602       if (matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
603         return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
604       }
605       break;
606     }
607     default:
608       break;
609     }
610     return qcode;
611   }
612 
613   @Override
614   public boolean next(List<Cell> outResult) throws IOException {
615     return next(outResult, -1);
616   }
617 
618   // Implementation of ChangedReadersObserver
619   @Override
620   public void updateReaders() throws IOException {
621     lock.lock();
622     try {
623     if (this.closing) return;
624 
625     // All public synchronized API calls will call 'checkReseek' which will cause
626     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
627     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
628     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
629     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
630     if (this.heap == null) return;
631 
632     // this could be null.
633     this.lastTop = this.peek();
634 
635     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
636 
637     // close scanners to old obsolete Store files
638     this.heap.close(); // bubble thru and close all scanners.
639     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
640 
641     // Let the next() call handle re-creating and seeking
642     } finally {
643       lock.unlock();
644     }
645   }
646 
647   /**
648    * @return true if top of heap has changed (and KeyValueHeap has to try the
649    *         next KV)
650    * @throws IOException
651    */
652   protected boolean checkReseek() throws IOException {
653     if (this.heap == null && this.lastTop != null) {
654       resetScannerStack(this.lastTop);
655       if (this.heap.peek() == null
656           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
657         LOG.debug("Storescanner.peek() is changed where before = "
658             + this.lastTop.toString() + ",and after = " + this.heap.peek());
659         this.lastTop = null;
660         return true;
661       }
662       this.lastTop = null; // gone!
663     }
664     // else dont need to reseek
665     return false;
666   }
667 
668   protected void resetScannerStack(KeyValue lastTopKey) throws IOException {
669     if (heap != null) {
670       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
671     }
672 
673     /* When we have the scan object, should we not pass it to getScanners()
674      * to get a limited set of scanners? We did so in the constructor and we
675      * could have done it now by storing the scan object from the constructor */
676     List<KeyValueScanner> scanners = getScannersNoCompaction();
677 
678     // Seek all scanners to the initial key
679     seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
680 
681     // Combine all seeked scanners with a heap
682     resetKVHeap(scanners, store.getComparator());
683 
684     // Reset the state of the Query Matcher and set to top row.
685     // Only reset and call setRow if the row changes; avoids confusing the
686     // query matcher if scanning intra-row.
687     KeyValue kv = heap.peek();
688     if (kv == null) {
689       kv = lastTopKey;
690     }
691     byte[] row = kv.getBuffer();
692     int offset = kv.getRowOffset();
693     short length = kv.getRowLength();
694     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
695         matcher.rowOffset, matcher.rowLength)) {
696       this.countPerRow = 0;
697       matcher.reset();
698       matcher.setRow(row, offset, length);
699     }
700   }
701 
702   /**
703    * Check whether scan as expected order
704    * @param prevKV
705    * @param kv
706    * @param comparator
707    * @throws IOException
708    */
709   protected void checkScanOrder(KeyValue prevKV, KeyValue kv,
710       KeyValue.KVComparator comparator) throws IOException {
711     // Check that the heap gives us KVs in an increasing order.
712     assert prevKV == null || comparator == null
713         || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
714         + " followed by a " + "smaller key " + kv + " in cf " + store;
715   }
716 
717   protected boolean seekToNextRow(KeyValue kv) throws IOException {
718     return reseek(matcher.getKeyForNextRow(kv));
719   }
720 
721   /**
722    * Do a reseek in a normal StoreScanner(scan forward)
723    * @param kv
724    * @return true if scanner has values left, false if end of scanner
725    * @throws IOException
726    */
727   protected boolean seekAsDirection(KeyValue kv)
728       throws IOException {
729     return reseek(kv);
730   }
731 
732   @Override
733   public boolean reseek(KeyValue kv) throws IOException {
734     lock.lock();
735     try {
736     //Heap will not be null, if this is called from next() which.
737     //If called from RegionScanner.reseek(...) make sure the scanner
738     //stack is reset if needed.
739     checkReseek();
740     if (explicitColumnQuery && lazySeekEnabledGlobally) {
741       return heap.requestSeek(kv, true, useRowColBloom);
742     }
743     return heap.reseek(kv);
744     } finally {
745       lock.unlock();
746     }
747   }
748 
749   @Override
750   public long getSequenceID() {
751     return 0;
752   }
753 
754   /**
755    * Seek storefiles in parallel to optimize IO latency as much as possible
756    * @param scanners the list {@link KeyValueScanner}s to be read from
757    * @param kv the KeyValue on which the operation is being requested
758    * @throws IOException
759    */
760   private void parallelSeek(final List<? extends KeyValueScanner>
761       scanners, final KeyValue kv) throws IOException {
762     if (scanners.isEmpty()) return;
763     int storeFileScannerCount = scanners.size();
764     CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
765     List<ParallelSeekHandler> handlers = 
766         new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
767     for (KeyValueScanner scanner : scanners) {
768       if (scanner instanceof StoreFileScanner) {
769         ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
770           this.readPt, latch);
771         executor.submit(seekHandler);
772         handlers.add(seekHandler);
773       } else {
774         scanner.seek(kv);
775         latch.countDown();
776       }
777     }
778 
779     try {
780       latch.await();
781     } catch (InterruptedException ie) {
782       throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
783     }
784 
785     for (ParallelSeekHandler handler : handlers) {
786       if (handler.getErr() != null) {
787         throw new IOException(handler.getErr());
788       }
789     }
790   }
791 
792   /**
793    * Used in testing.
794    * @return all scanners in no particular order
795    */
796   List<KeyValueScanner> getAllScannersForTesting() {
797     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
798     KeyValueScanner current = heap.getCurrentForTesting();
799     if (current != null)
800       allScanners.add(current);
801     for (KeyValueScanner scanner : heap.getHeap())
802       allScanners.add(scanner);
803     return allScanners;
804   }
805 
806   static void enableLazySeekGlobally(boolean enable) {
807     lazySeekEnabledGlobally = enable;
808   }
809 
810   /**
811    * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).
812    */
813   public long getEstimatedNumberOfKvsScanned() {
814     return this.kvsScanned;
815   }
816 
817   @Override
818   public byte[] getNextIndexedKey() {
819     return this.heap.getNextIndexedKey();
820   }
821 }
822