View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Collections;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.KeyValue.KVComparator;
32  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
33  import org.apache.hadoop.hbase.util.Bytes;
34  
35  /**
36   * Base class for cell sink that separates the provided cells into multiple files.
37   */
38  @InterfaceAudience.Private
39  public abstract class StripeMultiFileWriter implements Compactor.CellSink {
40    private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
41  
42    /** Factory that is used to produce single StoreFile.Writer-s */
43    protected WriterFactory writerFactory;
44    protected KVComparator comparator;
45  
46    protected List<StoreFile.Writer> existingWriters;
47    protected List<byte[]> boundaries;
48    /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
49    protected StoreScanner sourceScanner;
50  
51    /** Whether to write stripe metadata */
52    private boolean doWriteStripeMetadata = true;
53  
54    public interface WriterFactory {
55      public StoreFile.Writer createWriter() throws IOException;
56    }
57  
58    /**
59     * Initializes multi-writer before usage.
60     * @param sourceScanner Optional store scanner to obtain the information about read progress.
61     * @param factory Factory used to produce individual file writers.
62     * @param comparator Comparator used to compare rows.
63     */
64    public void init(StoreScanner sourceScanner, WriterFactory factory, KVComparator comparator)
65        throws IOException {
66      this.writerFactory = factory;
67      this.sourceScanner = sourceScanner;
68      this.comparator = comparator;
69    }
70  
71    public void setNoStripeMetadata() {
72      this.doWriteStripeMetadata = false;
73    }
74  
75    public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
76      assert this.existingWriters != null;
77      commitWritersInternal();
78      assert this.boundaries.size() == (this.existingWriters.size() + 1);
79      LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
80        + "riting out metadata for " + this.existingWriters.size() + " writers");
81      List<Path> paths = new ArrayList<Path>();
82      for (int i = 0; i < this.existingWriters.size(); ++i) {
83        StoreFile.Writer writer = this.existingWriters.get(i);
84        if (writer == null) continue; // writer was skipped due to 0 KVs
85        if (doWriteStripeMetadata) {
86          writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
87          writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
88        }
89        writer.appendMetadata(maxSeqId, isMajor);
90        paths.add(writer.getPath());
91        writer.close();
92      }
93      this.existingWriters = null;
94      return paths;
95    }
96  
97    public List<Path> abortWriters() {
98      assert this.existingWriters != null;
99      List<Path> paths = new ArrayList<Path>();
100     for (StoreFile.Writer writer : this.existingWriters) {
101       try {
102         paths.add(writer.getPath());
103         writer.close();
104       } catch (Exception ex) {
105         LOG.error("Failed to close the writer after an unfinished compaction.", ex);
106       }
107     }
108     this.existingWriters = null;
109     return paths;
110   }
111 
112   /**
113    * Subclasses can call this method to make sure the first KV is within multi-writer range.
114    * @param left The left boundary of the writer.
115    * @param row The row to check.
116    * @param rowOffset Offset for row.
117    * @param rowLength Length for row.
118    */
119   protected void sanityCheckLeft(
120       byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
121     if (StripeStoreFileManager.OPEN_KEY != left &&
122         comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
123       String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
124         + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
125       LOG.error(error);
126       throw new IOException(error);
127     }
128   }
129 
130   /**
131    * Subclasses can call this method to make sure the last KV is within multi-writer range.
132    * @param right The right boundary of the writer.
133    * @param row The row to check.
134    * @param rowOffset Offset for row.
135    * @param rowLength Length for row.
136    */
137   protected void sanityCheckRight(
138       byte[] right, byte[] row, int rowOffset, int rowLength) throws IOException {
139     if (StripeStoreFileManager.OPEN_KEY != right &&
140         comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
141       String error = "The last row is higher or equal than the right boundary of ["
142           + Bytes.toString(right) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
143       LOG.error(error);
144       throw new IOException(error);
145     }
146   }
147 
148   /**
149    * Subclasses override this method to be called at the end of a successful sequence of
150    * append; all appends are processed before this method is called.
151    */
152   protected abstract void commitWritersInternal() throws IOException;
153 
154   /**
155    * MultiWriter that separates the cells based on fixed row-key boundaries.
156    * All the KVs between each pair of neighboring boundaries from the list supplied to ctor
157    * will end up in one file, and separate from all other such pairs.
158    */
159   public static class BoundaryMultiWriter extends StripeMultiFileWriter {
160     private StoreFile.Writer currentWriter;
161     private byte[] currentWriterEndKey;
162 
163     private KeyValue lastKv;
164     private long kvsInCurrentWriter = 0;
165     private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
166     private boolean hasAnyWriter = false;
167 
168     /**
169      * @param targetBoundaries The boundaries on which writers/files are separated.
170      * @param majorRangeFrom Major range is the range for which at least one file should be
171      *                       written (because all files are included in compaction).
172      *                       majorRangeFrom is the left boundary.
173      * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom).
174      */
175     public BoundaryMultiWriter(List<byte[]> targetBoundaries,
176         byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
177       super();
178       this.boundaries = targetBoundaries;
179       this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
180       // "major" range (range for which all files are included) boundaries, if any,
181       // must match some target boundaries, let's find them.
182       assert  (majorRangeFrom == null) == (majorRangeTo == null);
183       if (majorRangeFrom != null) {
184         majorRangeFromIndex = (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0
185           : Collections.binarySearch(this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
186         majorRangeToIndex = (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
187           : Collections.binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
188         if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
189           throw new IOException("Major range does not match writer boundaries: [" +
190               Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
191               + majorRangeFromIndex + " to " + majorRangeToIndex);
192         }
193       }
194     }
195 
196     @Override
197     public void append(KeyValue kv) throws IOException {
198       if (currentWriter == null && existingWriters.isEmpty()) {
199         // First append ever, do a sanity check.
200         sanityCheckLeft(this.boundaries.get(0),
201             kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
202       }
203       prepareWriterFor(kv);
204       currentWriter.append(kv);
205       lastKv = kv; // for the sanity check
206       ++kvsInCurrentWriter;
207     }
208 
209     private boolean isKvAfterCurrentWriter(KeyValue kv) {
210       return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
211             (comparator.compareRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
212                 currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
213     }
214 
215     @Override
216     protected void commitWritersInternal() throws IOException {
217       stopUsingCurrentWriter();
218       while (existingWriters.size() < boundaries.size() - 1) {
219         createEmptyWriter();
220       }
221       if (lastKv != null) {
222         sanityCheckRight(boundaries.get(boundaries.size() - 1),
223             lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
224       }
225     }
226 
227     private void prepareWriterFor(KeyValue kv) throws IOException {
228       if (currentWriter != null && !isKvAfterCurrentWriter(kv)) return; // Use same writer.
229 
230       stopUsingCurrentWriter();
231       // See if KV will be past the writer we are about to create; need to add another one.
232       while (isKvAfterCurrentWriter(kv)) {
233         checkCanCreateWriter();
234         createEmptyWriter();
235       }
236       checkCanCreateWriter();
237       hasAnyWriter = true;
238       currentWriter = writerFactory.createWriter();
239       existingWriters.add(currentWriter);
240     }
241 
242     /**
243      * Called if there are no cells for some stripe.
244      * We need to have something in the writer list for this stripe, so that writer-boundary
245      * list indices correspond to each other. We can insert null in the writer list for that
246      * purpose, except in the following cases where we actually need a file:
247      * 1) If we are in range for which we are compacting all the files, we need to create an
248      * empty file to preserve stripe metadata.
249      * 2) If we have not produced any file at all for this compactions, and this is the
250      * last chance (the last stripe), we need to preserve last seqNum (see also HBASE-6059).
251      */
252     private void createEmptyWriter() throws IOException {
253       int index = existingWriters.size();
254       boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex);
255       // Stripe boundary count = stripe count + 1, so last stripe index is (#boundaries minus 2)
256       boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2));
257       boolean needEmptyFile = isInMajorRange || isLastWriter;
258       existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
259       hasAnyWriter |= needEmptyFile;
260       currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
261           ? null : boundaries.get(existingWriters.size() + 1);
262     }
263 
264     private void checkCanCreateWriter() throws IOException {
265       int maxWriterCount =  boundaries.size() - 1;
266       assert existingWriters.size() <= maxWriterCount;
267       if (existingWriters.size() >= maxWriterCount) {
268         throw new IOException("Cannot create any more writers (created " + existingWriters.size()
269             + " out of " + maxWriterCount + " - row might be out of range of all valid writers");
270       }
271     }
272 
273     private void stopUsingCurrentWriter() {
274       if (currentWriter != null) {
275         if (LOG.isDebugEnabled()) {
276           LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
277               + "] row; wrote out " + kvsInCurrentWriter + " kvs");
278         }
279         kvsInCurrentWriter = 0;
280       }
281       currentWriter = null;
282       currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
283           ? null : boundaries.get(existingWriters.size() + 1);
284     }
285   }
286 
287   /**
288    * MultiWriter that separates the cells based on target cell number per file and file count.
289    * New file is started every time the target number of KVs is reached, unless the fixed
290    * count of writers has already been created (in that case all the remaining KVs go into
291    * the last writer).
292    */
293   public static class SizeMultiWriter extends StripeMultiFileWriter {
294     private int targetCount;
295     private long targetKvs;
296     private byte[] left;
297     private byte[] right;
298 
299     private KeyValue lastKv;
300     private StoreFile.Writer currentWriter;
301     protected byte[] lastRowInCurrentWriter = null;
302     private long kvsInCurrentWriter = 0;
303     private long kvsSeen = 0;
304     private long kvsSeenInPrevious = 0;
305 
306     /**
307      * @param targetCount The maximum count of writers that can be created.
308      * @param targetKvs The number of KVs to read from source before starting each new writer.
309      * @param left The left boundary of the first writer.
310      * @param right The right boundary of the last writer.
311      */
312     public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
313       super();
314       this.targetCount = targetCount;
315       this.targetKvs = targetKvs;
316       this.left = left;
317       this.right = right;
318       int preallocate = Math.min(this.targetCount, 64);
319       this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
320       this.boundaries = new ArrayList<byte[]>(preallocate + 1);
321     }
322 
323     @Override
324     public void append(KeyValue kv) throws IOException {
325       // If we are waiting for opportunity to close and we started writing different row,
326       // discard the writer and stop waiting.
327       boolean doCreateWriter = false;
328       if (currentWriter == null) {
329         // First append ever, do a sanity check.
330         sanityCheckLeft(left, kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
331         doCreateWriter = true;
332       } else if (lastRowInCurrentWriter != null
333           && !comparator.matchingRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
334               lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
335         if (LOG.isDebugEnabled()) {
336           LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
337               + "] row; wrote out "  + kvsInCurrentWriter + " kvs");
338         }
339         lastRowInCurrentWriter = null;
340         kvsInCurrentWriter = 0;
341         kvsSeenInPrevious += kvsSeen;
342         doCreateWriter = true;
343       }
344       if (doCreateWriter) {
345         byte[] boundary = existingWriters.isEmpty() ? left : kv.getRow(); // make a copy
346         if (LOG.isDebugEnabled()) {
347           LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
348         }
349         currentWriter = writerFactory.createWriter();
350         boundaries.add(boundary);
351         existingWriters.add(currentWriter);
352       }
353 
354       currentWriter.append(kv);
355       lastKv = kv; // for the sanity check
356       ++kvsInCurrentWriter;
357       kvsSeen = kvsInCurrentWriter;
358       if (this.sourceScanner != null) {
359         kvsSeen = Math.max(kvsSeen,
360             this.sourceScanner.getEstimatedNumberOfKvsScanned() - kvsSeenInPrevious);
361       }
362 
363       // If we are not already waiting for opportunity to close, start waiting if we can
364       // create any more writers and if the current one is too big.
365       if (lastRowInCurrentWriter == null
366           && existingWriters.size() < targetCount
367           && kvsSeen >= targetKvs) {
368         lastRowInCurrentWriter = kv.getRow(); // make a copy
369         if (LOG.isDebugEnabled()) {
370           LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
371               lastRowInCurrentWriter) + "] row; observed " + kvsSeen + " kvs and wrote out "
372               + kvsInCurrentWriter + " kvs");
373         }
374       }
375     }
376 
377     @Override
378     protected void commitWritersInternal() throws IOException {
379       if (LOG.isDebugEnabled()) {
380         LOG.debug("Stopping with "  + kvsInCurrentWriter + " kvs in last writer" +
381             ((this.sourceScanner == null) ? "" : ("; observed estimated "
382                 + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
383       }
384       if (lastKv != null) {
385         sanityCheckRight(
386             right, lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
387       }
388 
389       // When expired stripes were going to be merged into one, and if no writer was created during
390       // the compaction, we need to create an empty file to preserve metadata.
391       if (existingWriters.isEmpty() && 1 == targetCount) {
392         if (LOG.isDebugEnabled()) {
393           LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata.");
394         }
395         boundaries.add(left);
396         existingWriters.add(writerFactory.createWriter());
397       }
398 
399       this.boundaries.add(right);
400     }
401   }
402 }