1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.KeyValueUtil;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.io.compress.Compression;
37 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
38 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
39 import org.apache.hadoop.hbase.regionserver.HStore;
40 import org.apache.hadoop.hbase.regionserver.InternalScanner;
41 import org.apache.hadoop.hbase.regionserver.ScanType;
42 import org.apache.hadoop.hbase.regionserver.Store;
43 import org.apache.hadoop.hbase.regionserver.StoreFile;
44 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
45 import org.apache.hadoop.hbase.regionserver.StoreScanner;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48 import org.apache.hadoop.util.StringUtils;
49
50
51
52
53
54 @InterfaceAudience.Private
55 public abstract class Compactor {
56 private static final Log LOG = LogFactory.getLog(Compactor.class);
57 protected CompactionProgress progress;
58 protected Configuration conf;
59 protected Store store;
60
61 private int compactionKVMax;
62 protected Compression.Algorithm compactionCompression;
63
64
65 Compactor(final Configuration conf, final Store store) {
66 this.conf = conf;
67 this.store = store;
68 this.compactionKVMax =
69 this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
70 this.compactionCompression = (this.store.getFamily() == null) ?
71 Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
72 }
73
74
75
76
77 public interface CellSink {
78 void append(KeyValue kv) throws IOException;
79 }
80
81 public CompactionProgress getProgress() {
82 return this.progress;
83 }
84
85
86 protected static class FileDetails {
87
88 public long maxKeyCount = 0;
89
90 public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
91
92 public long maxSeqId = 0;
93
94 public long maxMVCCReadpoint = 0;
95
96 public int maxTagsLength = 0;
97 }
98
99
100
101
102
103
104
105 protected FileDetails getFileDetails(
106 Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
107 FileDetails fd = new FileDetails();
108
109 for (StoreFile file : filesToCompact) {
110 long seqNum = file.getMaxSequenceId();
111 fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
112 StoreFile.Reader r = file.getReader();
113 if (r == null) {
114 LOG.warn("Null reader for " + file.getPath());
115 continue;
116 }
117
118
119
120 long keyCount = r.getEntries();
121 fd.maxKeyCount += keyCount;
122
123 Map<byte[], byte[]> fileInfo = r.loadFileInfo();
124 byte tmp[] = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
125 if (tmp != null) {
126 fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
127 }
128 tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
129 if (tmp != null) {
130 fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
131 }
132
133
134 long earliestPutTs = 0;
135 if (calculatePutTs) {
136 tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
137 if (tmp == null) {
138
139
140 fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
141 } else {
142 earliestPutTs = Bytes.toLong(tmp);
143 fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
144 }
145 }
146 if (LOG.isDebugEnabled()) {
147 LOG.debug("Compacting " + file +
148 ", keycount=" + keyCount +
149 ", bloomtype=" + r.getBloomFilterType().toString() +
150 ", size=" + StringUtils.humanReadableInt(r.length()) +
151 ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
152 ", seqNum=" + seqNum +
153 (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
154 }
155 }
156 return fd;
157 }
158
159
160
161
162
163
164 protected List<StoreFileScanner> createFileScanners(
165 final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
166 return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
167 smallestReadPoint);
168 }
169
170 protected long getSmallestReadPoint() {
171 return store.getSmallestReadPoint();
172 }
173
174
175
176
177
178
179
180
181
182 protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
183 ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
184 if (store.getCoprocessorHost() == null) return null;
185 return store.getCoprocessorHost()
186 .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
187 }
188
189
190
191
192
193
194
195
196 protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
197 ScanType scanType, InternalScanner scanner) throws IOException {
198 if (store.getCoprocessorHost() == null) return scanner;
199 return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
200 }
201
202
203
204
205
206
207
208
209 protected boolean performCompaction(InternalScanner scanner, CellSink writer,
210 long smallestReadPoint, CompactionThroughputController throughputController)
211 throws IOException {
212 long bytesWritten = 0;
213 long bytesWrittenProgress = 0;
214
215
216 List<Cell> kvs = new ArrayList<Cell>();
217 long closeCheckInterval = HStore.getCloseCheckInterval();
218 long lastMillis = 0;
219 if (LOG.isDebugEnabled()) {
220 lastMillis = EnvironmentEdgeManager.currentTimeMillis();
221 }
222 String compactionName =
223 store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString();
224 long now = 0;
225 boolean hasMore;
226 throughputController.start(compactionName);
227 try {
228 do {
229 hasMore = scanner.next(kvs, compactionKVMax);
230 if (LOG.isDebugEnabled()) {
231 now = EnvironmentEdgeManager.currentTimeMillis();
232 }
233
234 for (Cell c : kvs) {
235 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
236 if (kv.getMvccVersion() <= smallestReadPoint) {
237 kv.setMvccVersion(0);
238 }
239 writer.append(kv);
240 int len = kv.getLength();
241 ++progress.currentCompactedKVs;
242 progress.totalCompactedSize += len;
243 if (LOG.isDebugEnabled()) {
244 bytesWrittenProgress += len;
245 }
246 throughputController.control(compactionName, len);
247
248
249 if (closeCheckInterval > 0) {
250 bytesWritten += len;
251 if (bytesWritten > closeCheckInterval) {
252 bytesWritten = 0;
253 if (!store.areWritesEnabled()) {
254 progress.cancel();
255 return false;
256 }
257 }
258 }
259 }
260
261
262 if (LOG.isDebugEnabled()) {
263 if ((now - lastMillis) >= 60 * 1000) {
264 LOG.debug("Compaction progress: "
265 + compactionName
266 + " "
267 + progress
268 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
269 / ((now - lastMillis) / 1000.0)) + ", throughputController is "
270 + throughputController);
271 lastMillis = now;
272 bytesWrittenProgress = 0;
273 }
274 }
275 kvs.clear();
276 } while (hasMore);
277 } catch (InterruptedException e) {
278 throw new InterruptedIOException("Interrupted while control throughput of compacting "
279 + compactionName);
280 } finally {
281 throughputController.finish(compactionName);
282 progress.complete();
283 }
284 return true;
285 }
286
287
288
289
290
291
292
293
294
295 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
296 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
297 Scan scan = new Scan();
298 scan.setMaxVersions(store.getFamily().getMaxVersions());
299 return new StoreScanner(store, store.getScanInfo(), scan, scanners,
300 scanType, smallestReadPoint, earliestPutTs);
301 }
302
303
304
305
306
307
308
309
310
311
312 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
313 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
314 byte[] dropDeletesToRow) throws IOException {
315 Scan scan = new Scan();
316 scan.setMaxVersions(store.getFamily().getMaxVersions());
317 return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
318 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
319 }
320 }