1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.compactions;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.regionserver.HRegionServer;
34 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
35 import org.apache.hadoop.hbase.regionserver.StoreFile;
36 import org.apache.hadoop.hbase.regionserver.StoreUtils;
37 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
38
39 import com.google.common.base.Preconditions;
40 import com.google.common.base.Predicate;
41 import com.google.common.collect.Collections2;
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class RatioBasedCompactionPolicy extends CompactionPolicy {
50 private static final Log LOG = LogFactory.getLog(RatioBasedCompactionPolicy.class);
51
52 public RatioBasedCompactionPolicy(Configuration conf,
53 StoreConfigInformation storeConfigInfo) {
54 super(conf, storeConfigInfo);
55 }
56
57 private ArrayList<StoreFile> getCurrentEligibleFiles(
58 ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
59
60 if (!filesCompacting.isEmpty()) {
61
62
63 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
64 int idx = candidateFiles.indexOf(last);
65 Preconditions.checkArgument(idx != -1);
66 candidateFiles.subList(0, idx + 1).clear();
67 }
68 return candidateFiles;
69 }
70
71 public List<StoreFile> preSelectCompactionForCoprocessor(
72 final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
73 return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
74 }
75
76
77
78
79
80
81 public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
82 final List<StoreFile> filesCompacting, final boolean isUserCompaction,
83 final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
84
85 ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
86
87
88
89 int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
90 boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
91 >= storeConfigInfo.getBlockingFileCount();
92 candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
93 LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
94 filesCompacting.size() + " compacting, " + candidateSelection.size() +
95 " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
96
97 if (!forceMajor) {
98 candidateSelection = skipLargeFiles(candidateSelection);
99 }
100
101
102
103
104
105 boolean majorCompaction = (
106 (forceMajor && isUserCompaction)
107 || ((forceMajor || isMajorCompaction(candidateSelection))
108 && (candidateSelection.size() < comConf.getMaxFilesToCompact()))
109 || StoreUtils.hasReferences(candidateSelection)
110 );
111
112 if (!majorCompaction) {
113
114 candidateSelection = filterBulk(candidateSelection);
115 candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
116 candidateSelection = checkMinFilesCriteria(candidateSelection);
117 }
118 candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
119 CompactionRequest result = new CompactionRequest(candidateSelection);
120 result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
121 return result;
122 }
123
124
125
126
127
128
129
130 private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates) {
131 int pos = 0;
132 while (pos < candidates.size() && !candidates.get(pos).isReference()
133 && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) {
134 ++pos;
135 }
136 if (pos > 0) {
137 LOG.debug("Some files are too large. Excluding " + pos
138 + " files from compaction candidates");
139 candidates.subList(0, pos).clear();
140 }
141 return candidates;
142 }
143
144
145
146
147
148
149 private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
150 candidates.removeAll(Collections2.filter(candidates,
151 new Predicate<StoreFile>() {
152 @Override
153 public boolean apply(StoreFile input) {
154 return input.excludeFromMinorCompaction();
155 }
156 }));
157 return candidates;
158 }
159
160
161
162
163
164
165 private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
166 boolean isUserCompaction, boolean isMajorCompaction) {
167 int excess = candidates.size() - comConf.getMaxFilesToCompact();
168 if (excess > 0) {
169 if (isMajorCompaction && isUserCompaction) {
170 LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
171 " files because of a user-requested major compaction");
172 } else {
173 LOG.debug("Too many admissible files. Excluding " + excess
174 + " files from compaction candidates");
175 candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
176 }
177 }
178 return candidates;
179 }
180
181
182
183
184
185 private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
186 int minFiles = comConf.getMinFilesToCompact();
187 if (candidates.size() < minFiles) {
188 if(LOG.isDebugEnabled()) {
189 LOG.debug("Not compacting files because we only have " + candidates.size() +
190 " files ready for compaction. Need " + minFiles + " to initiate.");
191 }
192 candidates.clear();
193 }
194 return candidates;
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227 ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
228 boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
229 if (candidates.isEmpty()) {
230 return candidates;
231 }
232
233
234 int start = 0;
235 double ratio = comConf.getCompactionRatio();
236 if (mayUseOffPeak) {
237 ratio = comConf.getCompactionRatioOffPeak();
238 LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
239 }
240
241
242 final int countOfFiles = candidates.size();
243 long[] fileSizes = new long[countOfFiles];
244 long[] sumSize = new long[countOfFiles];
245 for (int i = countOfFiles - 1; i >= 0; --i) {
246 StoreFile file = candidates.get(i);
247 fileSizes[i] = file.getReader().length();
248
249 int tooFar = i + comConf.getMaxFilesToCompact() - 1;
250 sumSize[i] = fileSizes[i]
251 + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
252 - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
253 }
254
255
256 while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
257 fileSizes[start] > Math.max(comConf.getMinCompactSize(),
258 (long) (sumSize[start + 1] * ratio))) {
259 ++start;
260 }
261 if (start < countOfFiles) {
262 LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
263 + " files from " + countOfFiles + " candidates");
264 } else if (mayBeStuck) {
265
266 int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
267 if (filesToLeave >= 0) {
268 start = filesToLeave;
269 }
270 }
271 candidates.subList(0, start).clear();
272 return candidates;
273 }
274
275
276
277
278
279 public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
280 throws IOException {
281 boolean result = false;
282 long mcTime = getNextMajorCompactTime(filesToCompact);
283 if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
284 return result;
285 }
286
287 long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
288 long now = System.currentTimeMillis();
289 if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
290
291 long cfTtl = this.storeConfigInfo.getStoreFileTtl();
292 if (filesToCompact.size() == 1) {
293
294 StoreFile sf = filesToCompact.iterator().next();
295 Long minTimestamp = sf.getMinimumTimestamp();
296 long oldest = (minTimestamp == null)
297 ? Long.MIN_VALUE
298 : now - minTimestamp.longValue();
299 if (sf.isMajorCompaction() &&
300 (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
301 float blockLocalityIndex = sf.getHDFSBlockDistribution()
302 .getBlockLocalityIndex(HRegionServer.getHostname(comConf.conf));
303 if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
304 if (LOG.isDebugEnabled()) {
305 LOG.debug("Major compaction triggered on only store " + this +
306 "; to make hdfs blocks local, current blockLocalityIndex is " +
307 blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
308 ")");
309 }
310 result = true;
311 } else {
312 if (LOG.isDebugEnabled()) {
313 LOG.debug("Skipping major compaction of " + this +
314 " because one (major) compacted file only, oldestTime " +
315 oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +
316 blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
317 ")");
318 }
319 }
320 } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
321 LOG.debug("Major compaction triggered on store " + this +
322 ", because keyvalues outdated; time since last major compaction " +
323 (now - lowTimestamp) + "ms");
324 result = true;
325 }
326 } else {
327 if (LOG.isDebugEnabled()) {
328 LOG.debug("Major compaction triggered on store " + this +
329 "; time since last major compaction " + (now - lowTimestamp) + "ms");
330 }
331 result = true;
332 }
333 }
334 return result;
335 }
336
337 public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
338
339 long ret = comConf.getMajorCompactionPeriod();
340 if (ret > 0) {
341
342 double jitterPct = comConf.getMajorCompactionJitter();
343 if (jitterPct > 0) {
344 long jitter = Math.round(ret * jitterPct);
345
346 Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
347 if (seed != null) {
348 double rnd = (new Random(seed)).nextDouble();
349 ret += jitter - Math.round(2L * jitter * rnd);
350 } else {
351 ret = 0;
352 }
353 }
354 }
355 return ret;
356 }
357
358
359
360
361
362 public boolean throttleCompaction(long compactionSize) {
363 return compactionSize > comConf.getThrottlePoint();
364 }
365
366 public boolean needsCompaction(final Collection<StoreFile> storeFiles,
367 final List<StoreFile> filesCompacting) {
368 int numCandidates = storeFiles.size() - filesCompacting.size();
369 return numCandidates >= comConf.getMinFilesToCompact();
370 }
371 }