1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.SortedSet;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
36 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
37 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
38 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
39 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
40
41 import com.google.common.annotations.VisibleForTesting;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class StripeStoreFlusher extends StoreFlusher {
49 private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
50 private final Object flushLock = new Object();
51 private final StripeCompactionPolicy policy;
52 private final StripeCompactionPolicy.StripeInformationProvider stripes;
53
54 public StripeStoreFlusher(Configuration conf, Store store,
55 StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
56 super(conf, store);
57 this.policy = policy;
58 this.stripes = stripes;
59 }
60
61 @Override
62 public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
63 final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status)
64 throws IOException {
65 List<Path> result = new ArrayList<Path>();
66 int kvCount = snapshot.size();
67 if (kvCount == 0) return result;
68
69 long smallestReadPoint = store.getSmallestReadPoint();
70 InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
71 if (scanner == null) {
72 return result;
73 }
74
75
76 StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
77
78 long flushedBytes = 0;
79 boolean success = false;
80 StripeMultiFileWriter mw = null;
81 try {
82 mw = req.createWriter();
83 StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount);
84 StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
85 mw.init(storeScanner, factory, store.getComparator());
86
87 synchronized (flushLock) {
88 flushedBytes = performFlush(scanner, mw, smallestReadPoint);
89 result = mw.commitWriters(cacheFlushSeqNum, false);
90 success = true;
91 }
92 } finally {
93 if (!success && (mw != null)) {
94 for (Path leftoverFile : mw.abortWriters()) {
95 try {
96 store.getFileSystem().delete(leftoverFile, false);
97 } catch (Exception e) {
98 LOG.error("Failed to delete a file after failed flush: " + e);
99 }
100 }
101 }
102 flushedSize.set(flushedBytes);
103 try {
104 scanner.close();
105 } catch (IOException ex) {
106 LOG.warn("Failed to close flush scanner, ignoring", ex);
107 }
108 }
109 return result;
110 }
111
112 private StripeMultiFileWriter.WriterFactory createWriterFactory(
113 final TimeRangeTracker tracker, final long kvCount) {
114 return new StripeMultiFileWriter.WriterFactory() {
115 @Override
116 public Writer createWriter() throws IOException {
117 StoreFile.Writer writer = store.createWriterInTmp(
118 kvCount, store.getFamily().getCompression(), false, true, true);
119 writer.setTimeRangeTracker(tracker);
120 return writer;
121 }
122 };
123 }
124
125
126 public static class StripeFlushRequest {
127 @VisibleForTesting
128 public StripeMultiFileWriter createWriter() throws IOException {
129 StripeMultiFileWriter writer =
130 new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
131 writer.setNoStripeMetadata();
132 return writer;
133 }
134 }
135
136
137 public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
138 private final List<byte[]> targetBoundaries;
139
140
141 public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
142 this.targetBoundaries = targetBoundaries;
143 }
144
145 @Override
146 public StripeMultiFileWriter createWriter() throws IOException {
147 return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
148 }
149 }
150
151
152 public static class SizeStripeFlushRequest extends StripeFlushRequest {
153 private final int targetCount;
154 private final long targetKvs;
155
156
157
158
159
160
161 public SizeStripeFlushRequest(int targetCount, long targetKvs) {
162 this.targetCount = targetCount;
163 this.targetKvs = targetKvs;
164 }
165
166 @Override
167 public StripeMultiFileWriter createWriter() throws IOException {
168 return new StripeMultiFileWriter.SizeMultiWriter(
169 this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
170 }
171 }
172 }