View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.io.compress.Compression;
37  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
38  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
39  import org.apache.hadoop.hbase.regionserver.HStore;
40  import org.apache.hadoop.hbase.regionserver.InternalScanner;
41  import org.apache.hadoop.hbase.regionserver.ScanType;
42  import org.apache.hadoop.hbase.regionserver.Store;
43  import org.apache.hadoop.hbase.regionserver.StoreFile;
44  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
45  import org.apache.hadoop.hbase.regionserver.StoreScanner;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.util.StringUtils;
49  
50  /**
51   * A compactor is a compaction algorithm associated a given policy. Base class also contains
52   * reusable parts for implementing compactors (what is common and what isn't is evolving).
53   */
54  @InterfaceAudience.Private
55  public abstract class Compactor {
56    private static final Log LOG = LogFactory.getLog(Compactor.class);
57    protected CompactionProgress progress;
58    protected Configuration conf;
59    protected Store store;
60  
61    private int compactionKVMax;
62    protected Compression.Algorithm compactionCompression;
63  
64    //TODO: depending on Store is not good but, realistically, all compactors currently do.
65    Compactor(final Configuration conf, final Store store) {
66      this.conf = conf;
67      this.store = store;
68      this.compactionKVMax =
69        this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
70      this.compactionCompression = (this.store.getFamily() == null) ?
71          Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
72    }
73  
74    /**
75     * TODO: Replace this with CellOutputStream when StoreFile.Writer uses cells.
76     */
77    public interface CellSink {
78      void append(KeyValue kv) throws IOException;
79    }
80  
81    public CompactionProgress getProgress() {
82      return this.progress;
83    }
84  
85    /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
86    protected static class FileDetails {
87      /** Maximum key count after compaction (for blooms) */
88      public long maxKeyCount = 0;
89      /** Earliest put timestamp if major compaction */
90      public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
91      /** The last key in the files we're compacting. */
92      public long maxSeqId = 0;
93      /** Latest memstore read point found in any of the involved files */
94      public long maxMVCCReadpoint = 0;
95      /** Max tags length**/
96      public int maxTagsLength = 0;
97    }
98  
99    /**
100    * Extracts some details about the files to compact that are commonly needed by compactors.
101    * @param filesToCompact Files.
102    * @param calculatePutTs Whether earliest put TS is needed.
103    * @return The result.
104    */
105   protected FileDetails getFileDetails(
106       Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
107     FileDetails fd = new FileDetails();
108 
109     for (StoreFile file : filesToCompact) {
110       long seqNum = file.getMaxSequenceId();
111       fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
112       StoreFile.Reader r = file.getReader();
113       if (r == null) {
114         LOG.warn("Null reader for " + file.getPath());
115         continue;
116       }
117       // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
118       // blooms can cause progress to be miscalculated or if the user switches bloom
119       // type (e.g. from ROW to ROWCOL)
120       long keyCount = r.getEntries();
121       fd.maxKeyCount += keyCount;
122       // calculate the latest MVCC readpoint in any of the involved store files
123       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
124       byte tmp[] = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
125       if (tmp != null) {
126         fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
127       }
128       tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
129       if (tmp != null) {
130         fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
131       }
132       // If required, calculate the earliest put timestamp of all involved storefiles.
133       // This is used to remove family delete marker during compaction.
134       long earliestPutTs = 0;
135       if (calculatePutTs) {
136         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
137         if (tmp == null) {
138           // There's a file with no information, must be an old one
139           // assume we have very old puts
140           fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
141         } else {
142           earliestPutTs = Bytes.toLong(tmp);
143           fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
144         }
145       }
146       if (LOG.isDebugEnabled()) {
147         LOG.debug("Compacting " + file +
148           ", keycount=" + keyCount +
149           ", bloomtype=" + r.getBloomFilterType().toString() +
150           ", size=" + StringUtils.humanReadableInt(r.length()) +
151           ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
152           ", seqNum=" + seqNum +
153           (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
154       }
155     }
156     return fd;
157   }
158 
159   /**
160    * Creates file scanners for compaction.
161    * @param filesToCompact Files.
162    * @return Scanners.
163    */
164   protected List<StoreFileScanner> createFileScanners(
165       final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
166     return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
167       smallestReadPoint);
168   }
169 
170   protected long getSmallestReadPoint() {
171     return store.getSmallestReadPoint();
172   }
173 
174   /**
175    * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
176    * @param request Compaction request.
177    * @param scanType Scan type.
178    * @param earliestPutTs Earliest put ts.
179    * @param scanners File scanners for compaction files.
180    * @return Scanner override by coprocessor; null if not overriding.
181    */
182   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
183       ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
184     if (store.getCoprocessorHost() == null) return null;
185     return store.getCoprocessorHost()
186         .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
187   }
188 
189   /**
190    * Calls coprocessor, if any, to create scanners - after normal scanner creation.
191    * @param request Compaction request.
192    * @param scanType Scan type.
193    * @param scanner The default scanner created for compaction.
194    * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
195    */
196    protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
197       ScanType scanType, InternalScanner scanner) throws IOException {
198     if (store.getCoprocessorHost() == null) return scanner;
199     return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
200   }
201 
202   /**
203    * Performs the compaction.
204    * @param scanner Where to read from.
205    * @param writer Where to write to.
206    * @param smallestReadPoint Smallest read point.
207    * @return Whether compaction ended; false if it was interrupted for some reason.
208    */
209   protected boolean performCompaction(InternalScanner scanner, CellSink writer,
210       long smallestReadPoint, CompactionThroughputController throughputController)
211       throws IOException {
212     long bytesWritten = 0;
213     long bytesWrittenProgress = 0;
214     // Since scanner.next() can return 'false' but still be delivering data,
215     // we have to use a do/while loop.
216     List<Cell> kvs = new ArrayList<Cell>();
217     long closeCheckInterval = HStore.getCloseCheckInterval();
218     long lastMillis = 0;
219     if (LOG.isDebugEnabled()) {
220       lastMillis = EnvironmentEdgeManager.currentTimeMillis();
221     }
222     String compactionName =
223         store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString();
224     long now = 0;
225     boolean hasMore;
226     throughputController.start(compactionName);
227     try {
228       do {
229         hasMore = scanner.next(kvs, compactionKVMax);
230         if (LOG.isDebugEnabled()) {
231           now = EnvironmentEdgeManager.currentTimeMillis();
232         }
233         // output to writer:
234         for (Cell c : kvs) {
235           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
236           if (kv.getMvccVersion() <= smallestReadPoint) {
237             kv.setMvccVersion(0);
238           }
239           writer.append(kv);
240           int len = kv.getLength();
241           ++progress.currentCompactedKVs;
242           progress.totalCompactedSize += len;
243           if (LOG.isDebugEnabled()) {
244             bytesWrittenProgress += len;
245           }
246           throughputController.control(compactionName, len);
247 
248           // check periodically to see if a system stop is requested
249           if (closeCheckInterval > 0) {
250             bytesWritten += len;
251             if (bytesWritten > closeCheckInterval) {
252               bytesWritten = 0;
253               if (!store.areWritesEnabled()) {
254                 progress.cancel();
255                 return false;
256               }
257             }
258           }
259         }
260         // Log the progress of long running compactions every minute if
261         // logging at DEBUG level
262         if (LOG.isDebugEnabled()) {
263           if ((now - lastMillis) >= 60 * 1000) {
264             LOG.debug("Compaction progress: "
265                 + compactionName
266                 + " "
267                 + progress
268                 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
269                     / ((now - lastMillis) / 1000.0)) + ", throughputController is "
270                 + throughputController);
271             lastMillis = now;
272             bytesWrittenProgress = 0;
273           }
274         }
275         kvs.clear();
276       } while (hasMore);
277     } catch (InterruptedException e) {
278       throw new InterruptedIOException("Interrupted while control throughput of compacting "
279           + compactionName);
280     } finally {
281       throughputController.finish(compactionName);
282       progress.complete();
283     }
284     return true;
285   }
286 
287   /**
288    * @param store store
289    * @param scanners Store file scanners.
290    * @param scanType Scan type.
291    * @param smallestReadPoint Smallest MVCC read point.
292    * @param earliestPutTs Earliest put across all files.
293    * @return A compaction scanner.
294    */
295   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
296       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
297     Scan scan = new Scan();
298     scan.setMaxVersions(store.getFamily().getMaxVersions());
299     return new StoreScanner(store, store.getScanInfo(), scan, scanners,
300         scanType, smallestReadPoint, earliestPutTs);
301   }
302 
303   /**
304    * @param store The store.
305    * @param scanners Store file scanners.
306    * @param smallestReadPoint Smallest MVCC read point.
307    * @param earliestPutTs Earliest put across all files.
308    * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
309    * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
310    * @return A compaction scanner.
311    */
312   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
313      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
314      byte[] dropDeletesToRow) throws IOException {
315     Scan scan = new Scan();
316     scan.setMaxVersions(store.getFamily().getMaxVersions());
317     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
318         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
319   }
320 }