1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.regionserver.InternalScanner;
33 import org.apache.hadoop.hbase.regionserver.ScanType;
34 import org.apache.hadoop.hbase.regionserver.Store;
35 import org.apache.hadoop.hbase.regionserver.StoreFile;
36 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class DefaultCompactor extends Compactor {
44 private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
45
46 public DefaultCompactor(final Configuration conf, final Store store) {
47 super(conf, store);
48 }
49
50
51
52
53 public List<Path> compact(final CompactionRequest request,
54 CompactionThroughputController throughputController) throws IOException {
55 FileDetails fd = getFileDetails(request.getFiles(), request.isMajor());
56 this.progress = new CompactionProgress(fd.maxKeyCount);
57
58
59 long smallestReadPoint = getSmallestReadPoint();
60
61 List<StoreFileScanner> scanners;
62 Collection<StoreFile> readersToClose;
63 if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) {
64
65
66 readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
67 for (StoreFile f : request.getFiles()) {
68 readersToClose.add(new StoreFile(f));
69 }
70 scanners = createFileScanners(readersToClose, smallestReadPoint);
71 } else {
72 readersToClose = Collections.emptyList();
73 scanners = createFileScanners(request.getFiles(), smallestReadPoint);
74 }
75
76 StoreFile.Writer writer = null;
77 List<Path> newFiles = new ArrayList<Path>();
78 IOException e = null;
79 try {
80 InternalScanner scanner = null;
81 try {
82
83 ScanType scanType =
84 request.isMajor() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
85 scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
86 if (scanner == null) {
87 scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
88 }
89 scanner = postCreateCoprocScanner(request, scanType, scanner);
90 if (scanner == null) {
91
92 return newFiles;
93 }
94
95
96 writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
97 fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
98 boolean finished =
99 performCompaction(scanner, writer, smallestReadPoint, throughputController);
100 if (!finished) {
101 writer.close();
102 store.getFileSystem().delete(writer.getPath(), false);
103 writer = null;
104 throw new InterruptedIOException( "Aborting compaction of store " + store +
105 " in region " + store.getRegionInfo().getRegionNameAsString() +
106 " because it was interrupted.");
107 }
108 } finally {
109 if (scanner != null) {
110 scanner.close();
111 }
112 }
113 } catch (IOException ioe) {
114 e = ioe;
115
116 throw ioe;
117 } finally {
118 try {
119 if (writer != null) {
120 if (e != null) {
121 writer.close();
122 } else {
123 writer.appendMetadata(fd.maxSeqId, request.isMajor());
124 writer.close();
125 newFiles.add(writer.getPath());
126 }
127 }
128 } finally {
129 for (StoreFile f : readersToClose) {
130 try {
131 f.closeReader(true);
132 } catch (IOException ioe) {
133 LOG.warn("Exception closing " + f, ioe);
134 }
135 }
136 }
137 }
138 return newFiles;
139 }
140
141
142
143
144
145
146
147
148
149
150
151 public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
152 throws IOException {
153 CompactionRequest cr = new CompactionRequest(filesToCompact);
154 cr.setIsMajor(isMajor);
155 return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE);
156 }
157 }