1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.*;
21 import static org.mockito.Mockito.*;
22 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
23 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
24 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
25
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.TreeMap;
31
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.HBaseConfiguration;
37 import org.apache.hadoop.hbase.HColumnDescriptor;
38 import org.apache.hadoop.hbase.HRegionInfo;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.KeyValue.KVComparator;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.io.compress.Compression;
43 import org.apache.hadoop.hbase.io.hfile.HFile;
44 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
45 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
46 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
47 import org.apache.hadoop.hbase.testclassification.SmallTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51 import org.mockito.invocation.InvocationOnMock;
52 import org.mockito.stubbing.Answer;
53
54
55 @Category(SmallTests.class)
56 public class TestStripeCompactor {
57 private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
58 private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
59
60 private static final byte[] KEY_B = Bytes.toBytes("bbb");
61 private static final byte[] KEY_C = Bytes.toBytes("ccc");
62 private static final byte[] KEY_D = Bytes.toBytes("ddd");
63
64 private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
65 private static final KeyValue KV_B = kvAfter(KEY_B);
66 private static final KeyValue KV_C = kvAfter(KEY_C);
67 private static final KeyValue KV_D = kvAfter(KEY_D);
68
69 private static KeyValue kvAfter(byte[] key) {
70 return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
71 }
72
73 private static <T> T[] a(T... a) {
74 return a;
75 }
76
77 private static KeyValue[] e() {
78 return TestStripeCompactor.<KeyValue>a();
79 }
80
81 @Test
82 public void testBoundaryCompactions() throws Exception {
83
84 verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
85 a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
86 verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
87 verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
88 }
89
90 @Test
91 public void testBoundaryCompactionEmptyFiles() throws Exception {
92
93 verifyBoundaryCompaction(
94 a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false);
95 verifyBoundaryCompaction(a(KV_A, KV_C),
96 a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false);
97
98 verifyBoundaryCompaction(
99 e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false);
100
101 verifyBoundaryCompaction(
102 e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false);
103 verifyBoundaryCompaction(
104 e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false);
105
106 verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
107 a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
108 verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
109 a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
110
111 }
112
113 public static void verifyBoundaryCompaction(
114 KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception {
115 verifyBoundaryCompaction(input, boundaries, output, null, null, true);
116 }
117
118 public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries,
119 KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles)
120 throws Exception {
121 StoreFileWritersCapture writers = new StoreFileWritersCapture();
122 StripeCompactor sc = createCompactor(writers, input);
123 List<Path> paths =
124 sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
125 NoLimitCompactionThroughputController.INSTANCE);
126 writers.verifyKvs(output, allFiles, true);
127 if (allFiles) {
128 assertEquals(output.length, paths.size());
129 writers.verifyBoundaries(boundaries);
130 }
131 }
132
133 @Test
134 public void testSizeCompactions() throws Exception {
135
136 verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
137 a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
138 verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
139 a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
140 verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
141
142 verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
143 a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
144 verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
145 a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
146
147 verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
148 a(a(KV_A), a(KV_B, KV_C, KV_D)));
149 verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
150 new KeyValue[][] { a(KV_A, KV_B, KV_C) });
151
152 verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
153 a(a(KV_A, KV_B), a(KV_C, KV_D)));
154 }
155
156 public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize,
157 byte[] left, byte[] right, KeyValue[][] output) throws Exception {
158 StoreFileWritersCapture writers = new StoreFileWritersCapture();
159 StripeCompactor sc = createCompactor(writers, input);
160 List<Path> paths =
161 sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
162 NoLimitCompactionThroughputController.INSTANCE);
163 assertEquals(output.length, paths.size());
164 writers.verifyKvs(output, true, true);
165 List<byte[]> boundaries = new ArrayList<byte[]>();
166 boundaries.add(left);
167 for (int i = 1; i < output.length; ++i) {
168 boundaries.add(output[i][0].getRow());
169 }
170 boundaries.add(right);
171 writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
172 }
173
174 private static StripeCompactor createCompactor(
175 StoreFileWritersCapture writers, KeyValue[] input) throws Exception {
176 Configuration conf = HBaseConfiguration.create();
177 final Scanner scanner = new Scanner(input);
178
179
180 HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
181 ScanInfo si = new ScanInfo(col, Long.MAX_VALUE, 0, new KVComparator());
182 Store store = mock(Store.class);
183 when(store.getFamily()).thenReturn(col);
184 when(store.getScanInfo()).thenReturn(si);
185 when(store.areWritesEnabled()).thenReturn(true);
186 when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
187 when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
188 when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
189 anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
190 when(store.getComparator()).thenReturn(new KVComparator());
191
192 return new StripeCompactor(conf, store) {
193 @Override
194 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
195 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
196 byte[] dropDeletesToRow) throws IOException {
197 return scanner;
198 }
199
200 @Override
201 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
202 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
203 return scanner;
204 }
205 };
206 }
207
208 private static CompactionRequest createDummyRequest() throws Exception {
209
210
211 StoreFile sf = mock(StoreFile.class);
212 StoreFile.Reader r = mock(StoreFile.Reader.class);
213 when(r.length()).thenReturn(1L);
214 when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
215 when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
216 when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
217 .thenReturn(mock(StoreFileScanner.class));
218 when(sf.getReader()).thenReturn(r);
219 when(sf.createReader()).thenReturn(r);
220 return new CompactionRequest(Arrays.asList(sf));
221 }
222
223 private static class Scanner implements InternalScanner {
224 private final ArrayList<KeyValue> kvs;
225 public Scanner(KeyValue... kvs) {
226 this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
227 }
228
229 @Override
230 public boolean next(List<Cell> results) throws IOException {
231 if (kvs.isEmpty()) return false;
232 results.add(kvs.remove(0));
233 return !kvs.isEmpty();
234 }
235 @Override
236 public boolean next(List<Cell> result, int limit) throws IOException {
237 return next(result);
238 }
239 @Override
240 public void close() throws IOException {}
241 }
242
243
244 public static class StoreFileWritersCapture implements
245 Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
246 public static class Writer {
247 public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
248 public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
249 }
250
251 private List<Writer> writers = new ArrayList<Writer>();
252
253 @Override
254 public StoreFile.Writer createWriter() throws IOException {
255 final Writer realWriter = new Writer();
256 writers.add(realWriter);
257 StoreFile.Writer writer = mock(StoreFile.Writer.class);
258 doAnswer(new Answer<Object>() {
259 public Object answer(InvocationOnMock invocation) {
260 return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]);
261 }}).when(writer).append(any(KeyValue.class));
262 doAnswer(new Answer<Object>() {
263 public Object answer(InvocationOnMock invocation) {
264 Object[] args = invocation.getArguments();
265 return realWriter.data.put((byte[])args[0], (byte[])args[1]);
266 }}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
267 return writer;
268 }
269
270 @Override
271 public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
272 return createWriter();
273 }
274
275 public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
276 if (allFiles) {
277 assertEquals(kvss.length, writers.size());
278 }
279 int skippedWriters = 0;
280 for (int i = 0; i < kvss.length; ++i) {
281 KeyValue[] kvs = kvss[i];
282 if (kvs != null) {
283 Writer w = writers.get(i - skippedWriters);
284 if (requireMetadata) {
285 assertNotNull(w.data.get(STRIPE_START_KEY));
286 assertNotNull(w.data.get(STRIPE_END_KEY));
287 } else {
288 assertNull(w.data.get(STRIPE_START_KEY));
289 assertNull(w.data.get(STRIPE_END_KEY));
290 }
291 assertEquals(kvs.length, w.kvs.size());
292 for (int j = 0; j < kvs.length; ++j) {
293 assertEquals(kvs[j], w.kvs.get(j));
294 }
295 } else {
296 assertFalse(allFiles);
297 ++skippedWriters;
298 }
299 }
300 }
301
302 public void verifyBoundaries(byte[][] boundaries) {
303 assertEquals(boundaries.length - 1, writers.size());
304 for (int i = 0; i < writers.size(); ++i) {
305 assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY));
306 assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY));
307 }
308 }
309 }
310 }