View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.Iterator;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Map.Entry;
32  import java.util.TreeMap;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.KeyValue.KVComparator;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.ConcatenatedLists;
44  import org.apache.hadoop.util.StringUtils;
45  
46  import com.google.common.collect.ImmutableCollection;
47  import com.google.common.collect.ImmutableList;
48  import com.google.common.collect.Lists;
49  
50  
51  /**
52   * Stripe implementation of StoreFileManager.
53   * Not thread safe - relies on external locking (in HStore). Collections that this class
54   * returns are immutable or unique to the call, so they should be safe.
55   * Stripe store splits the key space of the region into non-overlapping stripes, as well as
56   * some recent files that have all the keys (level 0). Each stripe contains a set of files.
57   * When L0 is compacted, it's split into the files corresponding to existing stripe boundaries,
58   * that can thus be added to stripes.
59   * When scan or get happens, it only has to read the files from the corresponding stripes.
60   * See StripeCompationPolicy on how the stripes are determined; this class doesn't care.
61   *
62   * This class should work together with StripeCompactionPolicy and StripeCompactor.
63   * With regard to how they work, we make at least the following (reasonable) assumptions:
64   *  - Compaction produces one file per new stripe (if any); that is easy to change.
65   *  - Compaction has one contiguous set of stripes both in and out, except if L0 is involved.
66   */
67  @InterfaceAudience.Private
68  public class StripeStoreFileManager
69    implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
70    static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
71  
72    /**
73     * The file metadata fields that contain the stripe information.
74     */
75    public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
76    public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
77  
78    private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
79  
80    /**
81     * The key value used for range boundary, indicating that the boundary is open (i.e. +-inf).
82     */
83    public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
84    final static byte[] INVALID_KEY = null;
85  
86    /**
87     * The state class. Used solely to replace results atomically during
88     * compactions and avoid complicated error handling.
89     */
90    private static class State {
91      /**
92       * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored
93       * here. It is invariant that the start row of the stripe is the end row of the previous one
94       * (and is an open boundary for the first one).
95       */
96      public byte[][] stripeEndRows = new byte[0][];
97  
98      /**
99       * Files by stripe. Each element of the list corresponds to stripeEndRow element with the
100      * same index, except the last one. Inside each list, the files are in reverse order by
101      * seqNum. Note that the length of this is one higher than that of stripeEndKeys.
102      */
103     public ArrayList<ImmutableList<StoreFile>> stripeFiles
104       = new ArrayList<ImmutableList<StoreFile>>();
105     /** Level 0. The files are in reverse order by seqNum. */
106     public ImmutableList<StoreFile> level0Files = ImmutableList.<StoreFile>of();
107 
108     /** Cached list of all files in the structure, to return from some calls */
109     public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
110   }
111   private State state = null;
112 
113   /** Cached file metadata (or overrides as the case may be) */
114   private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
115   private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
116   /** Normally invalid key is null, but in the map null is the result for "no key"; so use
117    * the following constant value in these maps instead. Note that this is a constant and
118    * we use it to compare by reference when we read from the map. */
119   private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
120 
121   private final KVComparator kvComparator;
122   private StripeStoreConfig config;
123 
124   private final int blockingFileCount;
125 
126   public StripeStoreFileManager(
127       KVComparator kvComparator, Configuration conf, StripeStoreConfig config) {
128     this.kvComparator = kvComparator;
129     this.config = config;
130     this.blockingFileCount = conf.getInt(
131         HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
132   }
133 
134   @Override
135   public void loadFiles(List<StoreFile> storeFiles) {
136     loadUnclassifiedStoreFiles(storeFiles);
137   }
138 
139   @Override
140   public Collection<StoreFile> getStorefiles() {
141     return state.allFilesCached;
142   }
143 
144   @Override
145   public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
146     CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
147     cmc.mergeResults(null, sfs);
148     debugDumpState("Added new files");
149   }
150 
151   @Override
152   public ImmutableCollection<StoreFile> clearFiles() {
153     ImmutableCollection<StoreFile> result = state.allFilesCached;
154     this.state = new State();
155     this.fileStarts.clear();
156     this.fileEnds.clear();
157     return result;
158   }
159 
160   @Override
161   public int getStorefileCount() {
162     return state.allFilesCached.size();
163   }
164 
165   /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)}
166    * for details on this methods. */
167   @Override
168   public Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
169     KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
170     // Order matters for this call.
171     result.addSublist(state.level0Files);
172     if (!state.stripeFiles.isEmpty()) {
173       int lastStripeIndex = findStripeForRow(targetKey.getRow(), false);
174       for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
175         result.addSublist(state.stripeFiles.get(stripeIndex));
176       }
177     }
178     return result.iterator();
179   }
180 
181   /** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and
182    * {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, KeyValue)}
183    * for details on this methods. */
184   @Override
185   public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
186       Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final KeyValue candidate) {
187     KeyBeforeConcatenatedLists.Iterator original =
188         (KeyBeforeConcatenatedLists.Iterator)candidateFiles;
189     assert original != null;
190     ArrayList<List<StoreFile>> components = original.getComponents();
191     for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
192       StoreFile sf = components.get(firstIrrelevant).get(0);
193       byte[] endKey = endOf(sf);
194       // Entries are ordered as such: L0, then stripes in reverse order. We never remove
195       // level 0; we remove the stripe, and all subsequent ones, as soon as we find the
196       // first one that cannot possibly have better candidates.
197       if (!isInvalid(endKey) && !isOpen(endKey)
198           && (nonOpenRowCompare(endKey, targetKey.getRow()) <= 0)) {
199         original.removeComponents(firstIrrelevant);
200         break;
201       }
202     }
203     return original;
204   }
205 
206   @Override
207   /**
208    * Override of getSplitPoint that determines the split point as the boundary between two
209    * stripes, unless it causes significant imbalance between split sides' sizes. In that
210    * case, the split boundary will be chosen from the middle of one of the stripes to
211    * minimize imbalance.
212    * @return The split point, or null if no split is possible.
213    */
214   public byte[] getSplitPoint() throws IOException {
215     if (this.getStorefileCount() == 0) return null;
216     if (state.stripeFiles.size() <= 1) {
217       return getSplitPointFromAllFiles();
218     }
219     int leftIndex = -1, rightIndex = state.stripeFiles.size();
220     long leftSize = 0, rightSize = 0;
221     long lastLeftSize = 0, lastRightSize = 0;
222     while (rightIndex - 1 != leftIndex) {
223       if (leftSize >= rightSize) {
224         --rightIndex;
225         lastRightSize = getStripeFilesSize(rightIndex);
226         rightSize += lastRightSize;
227       } else {
228         ++leftIndex;
229         lastLeftSize = getStripeFilesSize(leftIndex);
230         leftSize += lastLeftSize;
231       }
232     }
233     if (leftSize == 0 || rightSize == 0) {
234       String errMsg = String.format("Cannot split on a boundary - left index %d size %d, "
235           + "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize);
236       debugDumpState(errMsg);
237       LOG.warn(errMsg);
238       return getSplitPointFromAllFiles();
239     }
240     double ratio = (double)rightSize / leftSize;
241     if (ratio < 1) {
242       ratio = 1 / ratio;
243     }
244     if (config.getMaxSplitImbalance() > ratio) return state.stripeEndRows[leftIndex];
245 
246     // If the difference between the sides is too large, we could get the proportional key on
247     // the a stripe to equalize the difference, but there's no proportional key method at the
248     // moment, and it's not extremely important.
249     // See if we can achieve better ratio if we split the bigger side in half.
250     boolean isRightLarger = rightSize >= leftSize;
251     double newRatio = isRightLarger
252         ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
253         : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
254     if (newRatio < 1) {
255       newRatio = 1 / newRatio;
256     }
257     if (newRatio >= ratio)  return state.stripeEndRows[leftIndex];
258     LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split "
259         + newRatio + " configured ratio " + config.getMaxSplitImbalance());
260     // Ok, we may get better ratio, get it.
261     return StoreUtils.getLargestFile(state.stripeFiles.get(
262         isRightLarger ? rightIndex : leftIndex)).getFileSplitPoint(this.kvComparator);
263   }
264 
265   private byte[] getSplitPointFromAllFiles() throws IOException {
266     ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
267     sfs.addSublist(state.level0Files);
268     sfs.addAllSublists(state.stripeFiles);
269     if (sfs.isEmpty()) return null;
270     return StoreUtils.getLargestFile(sfs).getFileSplitPoint(this.kvComparator);
271   }
272 
273   private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
274     return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
275   }
276 
277   @Override
278   public Collection<StoreFile> getFilesForScanOrGet(
279       boolean isGet, byte[] startRow, byte[] stopRow) {
280     if (state.stripeFiles.isEmpty()) {
281       return state.level0Files; // There's just L0.
282     }
283 
284     int firstStripe = findStripeForRow(startRow, true);
285     int lastStripe = findStripeForRow(stopRow, false);
286     assert firstStripe <= lastStripe;
287     if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
288       return state.stripeFiles.get(firstStripe); // There's just one stripe we need.
289     }
290     if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
291       return state.allFilesCached; // We need to read all files.
292     }
293 
294     ConcatenatedLists<StoreFile> result = new ConcatenatedLists<StoreFile>();
295     result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
296     result.addSublist(state.level0Files);
297     return result;
298   }
299 
300   @Override
301   public void addCompactionResults(
302     Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException {
303     // See class comment for the assumptions we make here.
304     LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
305         + " files replaced by " + results.size());
306     // In order to be able to fail in the middle of the operation, we'll operate on lazy
307     // copies and apply the result at the end.
308     CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
309     cmc.mergeResults(compactedFiles, results);
310     debugDumpState("Merged compaction results");
311   }
312 
313   @Override
314   public int getStoreCompactionPriority() {
315     // If there's only L0, do what the default store does.
316     // If we are in critical priority, do the same - we don't want to trump all stores all
317     // the time due to how many files we have.
318     int fc = getStorefileCount();
319     if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
320       return this.blockingFileCount - fc;
321     }
322     // If we are in good shape, we don't want to be trumped by all other stores due to how
323     // many files we have, so do an approximate mapping to normal priority range; L0 counts
324     // for all stripes.
325     int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
326     int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0);
327     return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
328   }
329 
330   /**
331    * Gets the total size of all files in the stripe.
332    * @param stripeIndex Stripe index.
333    * @return Size.
334    */
335   private long getStripeFilesSize(int stripeIndex) {
336     long result = 0;
337     for (StoreFile sf : state.stripeFiles.get(stripeIndex)) {
338       result += sf.getReader().length();
339     }
340     return result;
341   }
342 
343   /**
344    * Loads initial store files that were picked up from some physical location pertaining to
345    * this store (presumably). Unlike adding files after compaction, assumes empty initial
346    * sets, and is forgiving with regard to stripe constraints - at worst, many/all files will
347    * go to level 0.
348    * @param storeFiles Store files to add.
349    */
350   private void loadUnclassifiedStoreFiles(List<StoreFile> storeFiles) {
351     LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
352     TreeMap<byte[], ArrayList<StoreFile>> candidateStripes =
353         new TreeMap<byte[], ArrayList<StoreFile>>(MAP_COMPARATOR);
354     ArrayList<StoreFile> level0Files = new ArrayList<StoreFile>();
355     // Separate the files into tentative stripes; then validate. Currently, we rely on metadata.
356     // If needed, we could dynamically determine the stripes in future.
357     for (StoreFile sf : storeFiles) {
358       byte[] startRow = startOf(sf), endRow = endOf(sf);
359       // Validate the range and put the files into place.
360       if (isInvalid(startRow) || isInvalid(endRow)) {
361         insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0.
362         ensureLevel0Metadata(sf);
363       } else if (!isOpen(startRow) && !isOpen(endRow) &&
364           nonOpenRowCompare(startRow, endRow) >= 0) {
365         LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
366           + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
367         insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also.
368         ensureLevel0Metadata(sf);
369       } else {
370         ArrayList<StoreFile> stripe = candidateStripes.get(endRow);
371         if (stripe == null) {
372           stripe = new ArrayList<StoreFile>();
373           candidateStripes.put(endRow, stripe);
374         }
375         insertFileIntoStripe(stripe, sf);
376       }
377     }
378     // Possible improvement - for variable-count stripes, if all the files are in L0, we can
379     // instead create single, open-ended stripe with all files.
380 
381     boolean hasOverlaps = false;
382     byte[] expectedStartRow = null; // first stripe can start wherever
383     Iterator<Map.Entry<byte[], ArrayList<StoreFile>>> entryIter =
384         candidateStripes.entrySet().iterator();
385     while (entryIter.hasNext()) {
386       Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
387       ArrayList<StoreFile> files = entry.getValue();
388       // Validate the file start rows, and remove the bad ones to level 0.
389       for (int i = 0; i < files.size(); ++i) {
390         StoreFile sf = files.get(i);
391         byte[] startRow = startOf(sf);
392         if (expectedStartRow == null) {
393           expectedStartRow = startRow; // ensure that first stripe is still consistent
394         } else if (!rowEquals(expectedStartRow, startRow)) {
395           hasOverlaps = true;
396           LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
397               + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
398               + "], to L0 it goes");
399           StoreFile badSf = files.remove(i);
400           insertFileIntoStripe(level0Files, badSf);
401           ensureLevel0Metadata(badSf);
402           --i;
403         }
404       }
405       // Check if any files from the candidate stripe are valid. If so, add a stripe.
406       byte[] endRow = entry.getKey();
407       if (!files.isEmpty()) {
408         expectedStartRow = endRow; // Next stripe must start exactly at that key.
409       } else {
410         entryIter.remove();
411       }
412     }
413 
414     // In the end, there must be open ends on two sides. If not, and there were no errors i.e.
415     // files are consistent, they might be coming from a split. We will treat the boundaries
416     // as open keys anyway, and log the message.
417     // If there were errors, we'll play it safe and dump everything into L0.
418     if (!candidateStripes.isEmpty()) {
419       StoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
420       boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
421       if (!isOpen) {
422         LOG.warn("The range of the loaded files does not cover full key space: from ["
423             + Bytes.toString(startOf(firstFile)) + "], to ["
424             + Bytes.toString(candidateStripes.lastKey()) + "]");
425         if (!hasOverlaps) {
426           ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
427           ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
428         } else {
429           LOG.warn("Inconsistent files, everything goes to L0.");
430           for (ArrayList<StoreFile> files : candidateStripes.values()) {
431             for (StoreFile sf : files) {
432               insertFileIntoStripe(level0Files, sf);
433               ensureLevel0Metadata(sf);
434             }
435           }
436           candidateStripes.clear();
437         }
438       }
439     }
440 
441     // Copy the results into the fields.
442     State state = new State();
443     state.level0Files = ImmutableList.copyOf(level0Files);
444     state.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(candidateStripes.size());
445     state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
446     ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(level0Files);
447     int i = candidateStripes.size() - 1;
448     for (Map.Entry<byte[], ArrayList<StoreFile>> entry : candidateStripes.entrySet()) {
449       state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
450       newAllFiles.addAll(entry.getValue());
451       if (i > 0) {
452         state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
453       }
454       --i;
455     }
456     state.allFilesCached = ImmutableList.copyOf(newAllFiles);
457     this.state = state;
458     debugDumpState("Files loaded");
459   }
460 
461   private void ensureEdgeStripeMetadata(ArrayList<StoreFile> stripe, boolean isFirst) {
462     HashMap<StoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
463     for (StoreFile sf : stripe) {
464       targetMap.put(sf, OPEN_KEY);
465     }
466   }
467 
468   private void ensureLevel0Metadata(StoreFile sf) {
469     if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
470     if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
471   }
472 
473   private void debugDumpState(String string) {
474     if (!LOG.isDebugEnabled()) return;
475     StringBuilder sb = new StringBuilder();
476     sb.append("\n" + string + "; current stripe state is as such:");
477     sb.append("\n level 0 with ")
478         .append(state.level0Files.size())
479         .append(
480           " files: "
481               + StringUtils.humanReadableInt(
482                 StripeCompactionPolicy.getTotalFileSize(state.level0Files)) + ";");
483     for (int i = 0; i < state.stripeFiles.size(); ++i) {
484       String endRow = (i == state.stripeEndRows.length)
485           ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
486       sb.append("\n stripe ending in ")
487           .append(endRow)
488           .append(" with ")
489           .append(state.stripeFiles.get(i).size())
490           .append(
491             " files: "
492                 + StringUtils.humanReadableInt(
493                   StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i))) + ";");
494     }
495     sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
496     sb.append("\n").append(getStorefileCount()).append(" files total.");
497     LOG.debug(sb.toString());
498   }
499 
500   /**
501    * Checks whether the key indicates an open interval boundary (i.e. infinity).
502    */
503   private static final boolean isOpen(byte[] key) {
504     return key != null && key.length == 0;
505   }
506 
507   /**
508    * Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files).
509    */
510   private static final boolean isInvalid(byte[] key) {
511     return key == INVALID_KEY;
512   }
513 
514   /**
515    * Compare two keys for equality.
516    */
517   private final boolean rowEquals(byte[] k1, byte[] k2) {
518     return kvComparator.matchingRows(k1, 0, k1.length, k2, 0, k2.length);
519   }
520 
521   /**
522    * Compare two keys. Keys must not be open (isOpen(row) == false).
523    */
524   private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
525     assert !isOpen(k1) && !isOpen(k2);
526     return kvComparator.compareRows(k1, 0, k1.length, k2, 0, k2.length);
527   }
528 
529   /**
530    * Finds the stripe index by end row.
531    */
532   private final int findStripeIndexByEndRow(byte[] endRow) {
533     assert !isInvalid(endRow);
534     if (isOpen(endRow)) return state.stripeEndRows.length;
535     return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
536   }
537 
538   /**
539    * Finds the stripe index for the stripe containing a row provided externally for get/scan.
540    */
541   private final int findStripeForRow(byte[] row, boolean isStart) {
542     if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
543     if (!isStart && row == HConstants.EMPTY_END_ROW) return state.stripeFiles.size() - 1;
544     // If there's an exact match below, a stripe ends at "row". Stripe right boundary is
545     // exclusive, so that means the row is in the next stripe; thus, we need to add one to index.
546     // If there's no match, the return value of binarySearch is (-(insertion point) - 1), where
547     // insertion point is the index of the next greater element, or list size if none. The
548     // insertion point happens to be exactly what we need, so we need to add one to the result.
549     return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
550   }
551 
552   @Override
553   public final byte[] getStartRow(int stripeIndex) {
554     return (stripeIndex == 0  ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
555   }
556 
557   @Override
558   public final byte[] getEndRow(int stripeIndex) {
559     return (stripeIndex == state.stripeEndRows.length
560         ? OPEN_KEY : state.stripeEndRows[stripeIndex]);
561   }
562 
563 
564   private byte[] startOf(StoreFile sf) {
565     byte[] result = this.fileStarts.get(sf);
566     return result == null ? sf.getMetadataValue(STRIPE_START_KEY)
567         : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
568   }
569 
570   private byte[] endOf(StoreFile sf) {
571     byte[] result = this.fileEnds.get(sf);
572     return result == null ? sf.getMetadataValue(STRIPE_END_KEY)
573         : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
574   }
575 
576   /**
577    * Inserts a file in the correct place (by seqnum) in a stripe copy.
578    * @param stripe Stripe copy to insert into.
579    * @param sf File to insert.
580    */
581   private static void insertFileIntoStripe(ArrayList<StoreFile> stripe, StoreFile sf) {
582     // The only operation for which sorting of the files matters is KeyBefore. Therefore,
583     // we will store the file in reverse order by seqNum from the outset.
584     for (int insertBefore = 0; ; ++insertBefore) {
585       if (insertBefore == stripe.size()
586           || (StoreFile.Comparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) {
587         stripe.add(insertBefore, sf);
588         break;
589       }
590     }
591   }
592 
593   /**
594    * An extension of ConcatenatedLists that has several peculiar properties.
595    * First, one can cut the tail of the logical list by removing last several sub-lists.
596    * Second, items can be removed thru iterator.
597    * Third, if the sub-lists are immutable, they are replaced with mutable copies when needed.
598    * On average KeyBefore operation will contain half the stripes as potential candidates,
599    * but will quickly cut down on them as it finds something in the more likely ones; thus,
600    * the above allow us to avoid unnecessary copying of a bunch of lists.
601    */
602   private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<StoreFile> {
603     @Override
604     public java.util.Iterator<StoreFile> iterator() {
605       return new Iterator();
606     }
607 
608     public class Iterator extends ConcatenatedLists<StoreFile>.Iterator {
609       public ArrayList<List<StoreFile>> getComponents() {
610         return components;
611       }
612 
613       public void removeComponents(int startIndex) {
614         List<List<StoreFile>> subList = components.subList(startIndex, components.size());
615         for (List<StoreFile> entry : subList) {
616           size -= entry.size();
617         }
618         assert size >= 0;
619         subList.clear();
620       }
621 
622       @Override
623       public void remove() {
624         if (!this.nextWasCalled) {
625           throw new IllegalStateException("No element to remove");
626         }
627         this.nextWasCalled = false;
628         List<StoreFile> src = components.get(currentComponent);
629         if (src instanceof ImmutableList<?>) {
630           src = new ArrayList<StoreFile>(src);
631           components.set(currentComponent, src);
632         }
633         src.remove(indexWithinComponent);
634         --size;
635         --indexWithinComponent;
636         if (src.isEmpty()) {
637           components.remove(currentComponent); // indexWithinComponent is already -1 here.
638         }
639       }
640     }
641   }
642 
643   /**
644    * Non-static helper class for merging compaction or flush results.
645    * Since we want to merge them atomically (more or less), it operates on lazy copies,
646    * then creates a new state object and puts it in place.
647    */
648   private class CompactionOrFlushMergeCopy {
649     private ArrayList<List<StoreFile>> stripeFiles = null;
650     private ArrayList<StoreFile> level0Files = null;
651     private ArrayList<byte[]> stripeEndRows = null;
652 
653     private Collection<StoreFile> compactedFiles = null;
654     private Collection<StoreFile> results = null;
655 
656     private List<StoreFile> l0Results = new ArrayList<StoreFile>();
657     private final boolean isFlush;
658 
659     public CompactionOrFlushMergeCopy(boolean isFlush) {
660       // Create a lazy mutable copy (other fields are so lazy they start out as nulls).
661       this.stripeFiles = new ArrayList<List<StoreFile>>(
662           StripeStoreFileManager.this.state.stripeFiles);
663       this.isFlush = isFlush;
664     }
665 
666     public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
667         throws IOException {
668       assert this.compactedFiles == null && this.results == null;
669       this.compactedFiles = compactedFiles;
670       this.results = results;
671       // Do logical processing.
672       if (!isFlush) removeCompactedFiles();
673       TreeMap<byte[], StoreFile> newStripes = processResults();
674       if (newStripes != null) {
675         processNewCandidateStripes(newStripes);
676       }
677       // Create new state and update parent.
678       State state = createNewState();
679       StripeStoreFileManager.this.state = state;
680       updateMetadataMaps();
681     }
682 
683     private State createNewState() {
684       State oldState = StripeStoreFileManager.this.state;
685       // Stripe count should be the same unless the end rows changed.
686       assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
687       State newState = new State();
688       newState.level0Files = (this.level0Files == null) ? oldState.level0Files
689           : ImmutableList.copyOf(this.level0Files);
690       newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows
691           : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
692       newState.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(this.stripeFiles.size());
693       for (List<StoreFile> newStripe : this.stripeFiles) {
694         newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
695             ? (ImmutableList<StoreFile>)newStripe : ImmutableList.copyOf(newStripe));
696       }
697 
698       List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
699       if (!isFlush) newAllFiles.removeAll(compactedFiles);
700       newAllFiles.addAll(results);
701       newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
702       return newState;
703     }
704 
705     private void updateMetadataMaps() {
706       StripeStoreFileManager parent = StripeStoreFileManager.this;
707       if (!isFlush) {
708         for (StoreFile sf : this.compactedFiles) {
709           parent.fileStarts.remove(sf);
710           parent.fileEnds.remove(sf);
711         }
712       }
713       if (this.l0Results != null) {
714         for (StoreFile sf : this.l0Results) {
715           parent.ensureLevel0Metadata(sf);
716         }
717       }
718     }
719 
720     /**
721      * @param index Index of the stripe we need.
722      * @return A lazy stripe copy from current stripes.
723      */
724     private final ArrayList<StoreFile> getStripeCopy(int index) {
725       List<StoreFile> stripeCopy = this.stripeFiles.get(index);
726       ArrayList<StoreFile> result = null;
727       if (stripeCopy instanceof ImmutableList<?>) {
728         result = new ArrayList<StoreFile>(stripeCopy);
729         this.stripeFiles.set(index, result);
730       } else {
731         result = (ArrayList<StoreFile>)stripeCopy;
732       }
733       return result;
734     }
735 
736     /**
737      * @return A lazy L0 copy from current state.
738      */
739     private final ArrayList<StoreFile> getLevel0Copy() {
740       if (this.level0Files == null) {
741         this.level0Files = new ArrayList<StoreFile>(StripeStoreFileManager.this.state.level0Files);
742       }
743       return this.level0Files;
744     }
745 
746     /**
747      * Process new files, and add them either to the structure of existing stripes,
748      * or to the list of new candidate stripes.
749      * @return New candidate stripes.
750      */
751     private TreeMap<byte[], StoreFile> processResults() throws IOException {
752       TreeMap<byte[], StoreFile> newStripes = null;
753       for (StoreFile sf : this.results) {
754         byte[] startRow = startOf(sf), endRow = endOf(sf);
755         if (isInvalid(endRow) || isInvalid(startRow)) {
756           if (!isFlush) {
757             LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
758           }
759           insertFileIntoStripe(getLevel0Copy(), sf);
760           this.l0Results.add(sf);
761           continue;
762         }
763         if (!this.stripeFiles.isEmpty()) {
764           int stripeIndex = findStripeIndexByEndRow(endRow);
765           if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
766             // Simple/common case - add file to an existing stripe.
767             insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
768             continue;
769           }
770         }
771 
772         // Make a new candidate stripe.
773         if (newStripes == null) {
774           newStripes = new TreeMap<byte[], StoreFile>(MAP_COMPARATOR);
775         }
776         StoreFile oldSf = newStripes.put(endRow, sf);
777         if (oldSf != null) {
778           throw new IOException("Compactor has produced multiple files for the stripe ending in ["
779               + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
780         }
781       }
782       return newStripes;
783     }
784 
785     /**
786      * Remove compacted files.
787      * @param compactedFiles Compacted files.
788      */
789     private void removeCompactedFiles() throws IOException {
790       for (StoreFile oldFile : this.compactedFiles) {
791         byte[] oldEndRow = endOf(oldFile);
792         List<StoreFile> source = null;
793         if (isInvalid(oldEndRow)) {
794           source = getLevel0Copy();
795         } else {
796           int stripeIndex = findStripeIndexByEndRow(oldEndRow);
797           if (stripeIndex < 0) {
798             throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
799                 + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
800           }
801           source = getStripeCopy(stripeIndex);
802         }
803         if (!source.remove(oldFile)) {
804           throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
805         }
806       }
807     }
808 
809     /**
810      * See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with
811      * new candidate stripes/removes old stripes; produces new set of stripe end rows.
812      * @param newStripes  New stripes - files by end row.
813      */
814     private void processNewCandidateStripes(
815         TreeMap<byte[], StoreFile> newStripes) throws IOException {
816       // Validate that the removed and added aggregate ranges still make for a full key space.
817       boolean hasStripes = !this.stripeFiles.isEmpty();
818       this.stripeEndRows = new ArrayList<byte[]>(
819           Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
820       int removeFrom = 0;
821       byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
822       byte[] lastEndRow = newStripes.lastKey();
823       if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
824         throw new IOException("Newly created stripes do not cover the entire key space.");
825       }
826 
827       boolean canAddNewStripes = true;
828       Collection<StoreFile> filesForL0 = null;
829       if (hasStripes) {
830         // Determine which stripes will need to be removed because they conflict with new stripes.
831         // The new boundaries should match old stripe boundaries, so we should get exact matches.
832         if (isOpen(firstStartRow)) {
833           removeFrom = 0;
834         } else {
835           removeFrom = findStripeIndexByEndRow(firstStartRow);
836           if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
837           ++removeFrom;
838         }
839         int removeTo = findStripeIndexByEndRow(lastEndRow);
840         if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
841         // See if there are files in the stripes we are trying to replace.
842         ArrayList<StoreFile> conflictingFiles = new ArrayList<StoreFile>();
843         for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
844           conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
845         }
846         if (!conflictingFiles.isEmpty()) {
847           // This can be caused by two things - concurrent flush into stripes, or a bug.
848           // Unfortunately, we cannot tell them apart without looking at timing or something
849           // like that. We will assume we are dealing with a flush and dump it into L0.
850           if (isFlush) {
851             long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
852             LOG.warn("Stripes were created by a flush, but results of size " + newSize
853                 + " cannot be added because the stripes have changed");
854             canAddNewStripes = false;
855             filesForL0 = newStripes.values();
856           } else {
857             long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
858             LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
859                 + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
860             filesForL0 = conflictingFiles;
861           }
862           if (filesForL0 != null) {
863             for (StoreFile sf : filesForL0) {
864               insertFileIntoStripe(getLevel0Copy(), sf);
865             }
866             l0Results.addAll(filesForL0);
867           }
868         }
869 
870         if (canAddNewStripes) {
871           // Remove old empty stripes.
872           int originalCount = this.stripeFiles.size();
873           for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
874             if (removeIndex != originalCount - 1) {
875               this.stripeEndRows.remove(removeIndex);
876             }
877             this.stripeFiles.remove(removeIndex);
878           }
879         }
880       }
881 
882       if (!canAddNewStripes) return; // Files were already put into L0.
883 
884       // Now, insert new stripes. The total ranges match, so we can insert where we removed.
885       byte[] previousEndRow = null;
886       int insertAt = removeFrom;
887       for (Map.Entry<byte[], StoreFile> newStripe : newStripes.entrySet()) {
888         if (previousEndRow != null) {
889           // Validate that the ranges are contiguous.
890           assert !isOpen(previousEndRow);
891           byte[] startRow = startOf(newStripe.getValue());
892           if (!rowEquals(previousEndRow, startRow)) {
893             throw new IOException("The new stripes produced by "
894                 + (isFlush ? "flush" : "compaction") + " are not contiguous");
895           }
896         }
897         // Add the new stripe.
898         ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
899         tmp.add(newStripe.getValue());
900         stripeFiles.add(insertAt, tmp);
901         previousEndRow = newStripe.getKey();
902         if (!isOpen(previousEndRow)) {
903           stripeEndRows.add(insertAt, previousEndRow);
904         }
905         ++insertAt;
906       }
907     }
908   }
909 
910   @Override
911   public List<StoreFile> getLevel0Files() {
912     return this.state.level0Files;
913   }
914 
915   @Override
916   public List<byte[]> getStripeBoundaries() {
917     if (this.state.stripeFiles.isEmpty()) return new ArrayList<byte[]>();
918     ArrayList<byte[]> result = new ArrayList<byte[]>(this.state.stripeEndRows.length + 2);
919     result.add(OPEN_KEY);
920     Collections.addAll(result, this.state.stripeEndRows);
921     result.add(OPEN_KEY);
922     return result;
923   }
924 
925   @Override
926   public ArrayList<ImmutableList<StoreFile>> getStripes() {
927     return this.state.stripeFiles;
928   }
929 
930   @Override
931   public int getStripeCount() {
932     return this.state.stripeFiles.size();
933   }
934 
935   @Override
936   public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) {
937     // 1) We can never get rid of the last file which has the maximum seqid in a stripe.
938     // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
939     State state = this.state;
940     Collection<StoreFile> expiredStoreFiles = null;
941     for (ImmutableList<StoreFile> stripe : state.stripeFiles) {
942       expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles);
943     }
944     return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles);
945   }
946 
947   private Collection<StoreFile> findExpiredFiles(ImmutableList<StoreFile> stripe, long maxTs,
948       List<StoreFile> filesCompacting, Collection<StoreFile> expiredStoreFiles) {
949     // Order by seqnum is reversed.
950     for (int i = 1; i < stripe.size(); ++i) {
951       StoreFile sf = stripe.get(i);
952       long fileTs = sf.getReader().getMaxTimestamp();
953       if (fileTs < maxTs && !filesCompacting.contains(sf)) {
954         LOG.info("Found an expired store file: " + sf.getPath()
955             + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
956         if (expiredStoreFiles == null) {
957           expiredStoreFiles = new ArrayList<StoreFile>();
958         }
959         expiredStoreFiles.add(sf);
960       }
961     }
962     return expiredStoreFiles;
963   }
964 
965   @Override
966   public double getCompactionPressure() {
967     State stateLocal = this.state;
968     if (stateLocal.allFilesCached.size() > blockingFileCount) {
969       // just a hit to tell others that we have reached the blocking file count.
970       return 2.0;
971     }
972     if (stateLocal.stripeFiles.isEmpty()) {
973       return 0.0;
974     }
975     int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size();
976     // do not calculate L0 separately because data will be moved to stripe quickly and in most cases
977     // we flush data to stripe directly.
978     int delta = stateLocal.level0Files.isEmpty() ? 0 : 1;
979     double max = 0.0;
980     for (ImmutableList<StoreFile> stripeFile : stateLocal.stripeFiles) {
981       int stripeFileCount = stripeFile.size();
982       double normCount =
983           (double) (stripeFileCount + delta - config.getStripeCompactMinFiles())
984               / (blockingFilePerStripe - config.getStripeCompactMinFiles());
985       if (normCount >= 1.0) {
986         // This could happen if stripe is not split evenly. Do not return values that larger than
987         // 1.0 because we have not reached the blocking file count actually.
988         return 1.0;
989       }
990       if (normCount > max) {
991         max = normCount;
992       }
993     }
994     return max;
995   }
996 }