1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27 import static org.mockito.Matchers.any;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.when;
32
33 import java.io.IOException;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataOutputStream;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.hbase.HBaseConfiguration;
48 import org.apache.hadoop.hbase.HBaseTestCase;
49 import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
50 import org.apache.hadoop.hbase.HBaseTestingUtility;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.client.Delete;
54 import org.apache.hadoop.hbase.client.Durability;
55 import org.apache.hadoop.hbase.client.Put;
56 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
57 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
58 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
59 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
60 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
61 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
62 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
63 import org.apache.hadoop.hbase.regionserver.wal.HLog;
64 import org.apache.hadoop.hbase.testclassification.MediumTests;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.apache.hadoop.hbase.util.Pair;
67 import org.apache.hadoop.hbase.util.Threads;
68 import org.junit.After;
69 import org.junit.Assume;
70 import org.junit.Before;
71 import org.junit.Rule;
72 import org.junit.Test;
73 import org.junit.experimental.categories.Category;
74 import org.junit.rules.TestName;
75 import org.mockito.Mockito;
76 import org.mockito.invocation.InvocationOnMock;
77 import org.mockito.stubbing.Answer;
78
79
80
81
82
83 @Category(MediumTests.class)
84 public class TestCompaction {
85 @Rule public TestName name = new TestName();
86 static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
87 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
88 protected Configuration conf = UTIL.getConfiguration();
89
90 private HRegion r = null;
91 private HTableDescriptor htd = null;
92 private static final byte [] COLUMN_FAMILY = fam1;
93 private final byte [] STARTROW = Bytes.toBytes(START_KEY);
94 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
95 private int compactionThreshold;
96 private byte[] secondRowBytes, thirdRowBytes;
97 private static final long MAX_FILES_TO_COMPACT = 10;
98
99
100 public TestCompaction() {
101 super();
102
103
104 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
105 conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
106 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
107 NoLimitCompactionThroughputController.class.getName());
108 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
109
110 secondRowBytes = START_KEY_BYTES.clone();
111
112 secondRowBytes[START_KEY_BYTES.length - 1]++;
113 thirdRowBytes = START_KEY_BYTES.clone();
114 thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
115 }
116
117 @Before
118 public void setUp() throws Exception {
119 this.htd = UTIL.createTableDescriptor(name.getMethodName());
120 this.r = UTIL.createLocalHRegion(htd, null, null);
121 }
122
123 @After
124 public void tearDown() throws Exception {
125 HLog hlog = r.getLog();
126 this.r.close();
127 hlog.closeAndDelete();
128 }
129
130
131
132
133
134
135 @Test
136 public void testInterruptCompaction() throws Exception {
137 assertEquals(0, count());
138
139
140 int origWI = HStore.closeCheckInterval;
141 HStore.closeCheckInterval = 10*1000;
142
143 try {
144
145 int jmax = (int) Math.ceil(15.0/compactionThreshold);
146 byte [] pad = new byte[1000];
147 for (int i = 0; i < compactionThreshold; i++) {
148 HRegionIncommon loader = new HRegionIncommon(r);
149 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
150 p.setDurability(Durability.SKIP_WAL);
151 for (int j = 0; j < jmax; j++) {
152 p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
153 }
154 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
155 loader.put(p);
156 loader.flushcache();
157 }
158
159 HRegion spyR = spy(r);
160 doAnswer(new Answer() {
161 public Object answer(InvocationOnMock invocation) throws Throwable {
162 r.writestate.writesEnabled = false;
163 return invocation.callRealMethod();
164 }
165 }).when(spyR).doRegionCompactionPrep();
166
167
168 spyR.compactStores();
169
170
171 Store s = r.stores.get(COLUMN_FAMILY);
172 assertEquals(compactionThreshold, s.getStorefilesCount());
173 assertTrue(s.getStorefilesSize() > 15*1000);
174
175 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
176 assertEquals(0, ls.length);
177
178 } finally {
179
180 r.writestate.writesEnabled = true;
181 HStore.closeCheckInterval = origWI;
182
183
184 for (int i = 0; i < compactionThreshold; i++) {
185 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
186 byte [][] famAndQf = {COLUMN_FAMILY, null};
187 delete.deleteFamily(famAndQf[0]);
188 r.delete(delete);
189 }
190 r.flushcache();
191
192
193
194 final int ttl = 1000;
195 for (Store hstore: this.r.stores.values()) {
196 HStore store = (HStore)hstore;
197 ScanInfo old = store.getScanInfo();
198 ScanInfo si = new ScanInfo(old.getFamily(),
199 old.getMinVersions(), old.getMaxVersions(), ttl,
200 old.getKeepDeletedCells(), 0, old.getComparator());
201 store.setScanInfo(si);
202 }
203 Thread.sleep(ttl);
204
205 r.compactStores(true);
206 assertEquals(0, count());
207 }
208 }
209
210 private int count() throws IOException {
211 int count = 0;
212 for (StoreFile f: this.r.stores.
213 get(COLUMN_FAMILY_TEXT).getStorefiles()) {
214 HFileScanner scanner = f.getReader().getScanner(false, false);
215 if (!scanner.seekTo()) {
216 continue;
217 }
218 do {
219 count++;
220 } while(scanner.next());
221 }
222 return count;
223 }
224
225 private void createStoreFile(final HRegion region) throws IOException {
226 createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
227 }
228
229 private void createStoreFile(final HRegion region, String family) throws IOException {
230 HRegionIncommon loader = new HRegionIncommon(region);
231 HBaseTestCase.addContent(loader, family);
232 loader.flushcache();
233 }
234
235 @Test
236 public void testCompactionWithCorruptResult() throws Exception {
237 int nfiles = 10;
238 for (int i = 0; i < nfiles; i++) {
239 createStoreFile(r);
240 }
241 HStore store = (HStore) r.getStore(COLUMN_FAMILY);
242
243 Collection<StoreFile> storeFiles = store.getStorefiles();
244 DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
245 tool.compactForTesting(storeFiles, false);
246
247
248 FileSystem fs = store.getFileSystem();
249
250 Path dstPath = store.getRegionFileSystem().createTempName();
251 FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, (long)1024, null);
252 stream.writeChars("CORRUPT FILE!!!!");
253 stream.close();
254 Path origPath = store.getRegionFileSystem().commitStoreFile(
255 Bytes.toString(COLUMN_FAMILY), dstPath);
256
257 try {
258 ((HStore)store).moveFileIntoPlace(origPath);
259 } catch (Exception e) {
260
261
262 assert (fs.exists(origPath));
263 assert (!fs.exists(dstPath));
264 System.out.println("testCompactionWithCorruptResult Passed");
265 return;
266 }
267 fail("testCompactionWithCorruptResult failed since no exception was" +
268 "thrown while completing a corrupt file");
269 }
270
271
272
273
274
275 @Test
276 public void testTrackingCompactionRequest() throws Exception {
277
278 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
279 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
280 CompactSplitThread thread = new CompactSplitThread(mockServer);
281 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
282
283
284 Store store = r.getStore(COLUMN_FAMILY);
285 createStoreFile(r);
286 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
287 createStoreFile(r);
288 }
289
290 CountDownLatch latch = new CountDownLatch(1);
291 TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
292 thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
293
294 latch.await();
295
296 thread.interruptIfNecessary();
297 }
298
299
300
301
302
303
304 @Test
305 public void testMultipleCustomCompactionRequests() throws Exception {
306
307 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
308 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
309 CompactSplitThread thread = new CompactSplitThread(mockServer);
310 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
311
312
313 int numStores = r.getStores().size();
314 List<Pair<CompactionRequest, Store>> requests =
315 new ArrayList<Pair<CompactionRequest, Store>>(numStores);
316 CountDownLatch latch = new CountDownLatch(numStores);
317
318
319 for (Store store : r.getStores().values()) {
320 createStoreFile(r, store.getColumnFamilyName());
321 createStoreFile(r, store.getColumnFamilyName());
322 createStoreFile(r, store.getColumnFamilyName());
323 requests
324 .add(new Pair<CompactionRequest, Store>(new TrackableCompactionRequest(latch), store));
325 }
326
327 thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
328 Collections.unmodifiableList(requests));
329
330
331 latch.await();
332
333 thread.interruptIfNecessary();
334 }
335
336 private class StoreMockMaker extends StatefulStoreMockMaker {
337 public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
338 public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();
339 private ArrayList<Integer> results;
340
341 public StoreMockMaker(ArrayList<Integer> results) {
342 this.results = results;
343 }
344
345 public class TestCompactionContext extends CompactionContext {
346 private List<StoreFile> selectedFiles;
347 public TestCompactionContext(List<StoreFile> selectedFiles) {
348 super();
349 this.selectedFiles = selectedFiles;
350 }
351
352 @Override
353 public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
354 return new ArrayList<StoreFile>();
355 }
356
357 @Override
358 public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
359 boolean mayUseOffPeak, boolean forceMajor) throws IOException {
360 this.request = new CompactionRequest(selectedFiles);
361 this.request.setPriority(getPriority());
362 return true;
363 }
364
365 @Override
366 public List<Path> compact(CompactionThroughputController throughputController)
367 throws IOException {
368 finishCompaction(this.selectedFiles);
369 return new ArrayList<Path>();
370 }
371 }
372
373 @Override
374 public synchronized CompactionContext selectCompaction() {
375 CompactionContext ctx = new TestCompactionContext(new ArrayList<StoreFile>(notCompacting));
376 compacting.addAll(notCompacting);
377 notCompacting.clear();
378 try {
379 ctx.select(null, false, false, false);
380 } catch (IOException ex) {
381 fail("Shouldn't happen");
382 }
383 return ctx;
384 }
385
386 @Override
387 public synchronized void cancelCompaction(Object object) {
388 TestCompactionContext ctx = (TestCompactionContext)object;
389 compacting.removeAll(ctx.selectedFiles);
390 notCompacting.addAll(ctx.selectedFiles);
391 }
392
393 public synchronized void finishCompaction(List<StoreFile> sfs) {
394 if (sfs.isEmpty()) return;
395 synchronized (results) {
396 results.add(sfs.size());
397 }
398 compacting.removeAll(sfs);
399 }
400
401 @Override
402 public int getPriority() {
403 return 7 - compacting.size() - notCompacting.size();
404 }
405 }
406
407 public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
408 BlockingCompactionContext blocked = null;
409
410 public class BlockingCompactionContext extends CompactionContext {
411 public volatile boolean isInCompact = false;
412
413 public void unblock() {
414 synchronized (this) { this.notifyAll(); }
415 }
416
417 @Override
418 public List<Path> compact(CompactionThroughputController throughputController)
419 throws IOException {
420 try {
421 isInCompact = true;
422 synchronized (this) {
423 this.wait();
424 }
425 } catch (InterruptedException e) {
426 Assume.assumeNoException(e);
427 }
428 return new ArrayList<Path>();
429 }
430
431 @Override
432 public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
433 return new ArrayList<StoreFile>();
434 }
435
436 @Override
437 public boolean select(List<StoreFile> f, boolean i, boolean m, boolean e)
438 throws IOException {
439 this.request = new CompactionRequest(new ArrayList<StoreFile>());
440 return true;
441 }
442 }
443
444 @Override
445 public CompactionContext selectCompaction() {
446 this.blocked = new BlockingCompactionContext();
447 try {
448 this.blocked.select(null, false, false, false);
449 } catch (IOException ex) {
450 fail("Shouldn't happen");
451 }
452 return this.blocked;
453 }
454
455 @Override
456 public void cancelCompaction(Object object) {}
457
458 public int getPriority() {
459 return Integer.MIN_VALUE;
460 }
461
462 public BlockingCompactionContext waitForBlocking() {
463 while (this.blocked == null || !this.blocked.isInCompact) {
464 Threads.sleepWithoutInterrupt(50);
465 }
466 BlockingCompactionContext ctx = this.blocked;
467 this.blocked = null;
468 return ctx;
469 }
470
471 @Override
472 public Store createStoreMock(String name) throws Exception {
473 return createStoreMock(Integer.MIN_VALUE, name);
474 }
475
476 public Store createStoreMock(int priority, String name) throws Exception {
477
478 Store s = super.createStoreMock(name);
479 when(s.getCompactPriority()).thenReturn(priority);
480 return s;
481 }
482 }
483
484
485 @Test
486 public void testCompactionQueuePriorities() throws Exception {
487
488 final Configuration conf = HBaseConfiguration.create();
489 HRegionServer mockServer = mock(HRegionServer.class);
490 when(mockServer.isStopped()).thenReturn(false);
491 when(mockServer.getConfiguration()).thenReturn(conf);
492 CompactSplitThread cst = new CompactSplitThread(mockServer);
493 when(mockServer.getCompactSplitThread()).thenReturn(cst);
494
495
496 HRegion r = mock(HRegion.class);
497 when(
498 r.compact(any(CompactionContext.class), any(Store.class),
499 any(CompactionThroughputController.class))).then(new Answer<Boolean>() {
500 public Boolean answer(InvocationOnMock invocation) throws Throwable {
501 ((CompactionContext)invocation.getArguments()[0]).compact(
502 (CompactionThroughputController)invocation.getArguments()[2]);
503 return true;
504 }
505 });
506
507
508 ArrayList<Integer> results = new ArrayList<Integer>();
509 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
510 Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
511 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
512
513
514 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
515 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
516
517
518 for (int i = 0; i < 4; ++i) {
519 sm.notCompacting.add(createFile());
520 }
521 cst.requestSystemCompaction(r, store, "s1-pri3");
522 for (int i = 0; i < 3; ++i) {
523 sm2.notCompacting.add(createFile());
524 }
525 cst.requestSystemCompaction(r, store2, "s2-pri4");
526
527 for (int i = 0; i < 2; ++i) {
528 sm.notCompacting.add(createFile());
529 }
530 cst.requestSystemCompaction(r, store, "s1-pri1");
531
532 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
533
534
535 currentBlock.unblock();
536 currentBlock = blocker.waitForBlocking();
537
538 assertEquals(1, results.size());
539 assertEquals(6, results.get(0).intValue());
540
541 for (int i = 0; i < 2; ++i) {
542 sm.notCompacting.add(createFile());
543 }
544
545
546 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
547 currentBlock.unblock();
548 currentBlock = blocker.waitForBlocking();
549 assertEquals(3, results.size());
550 assertEquals(3, results.get(1).intValue());
551 assertEquals(2, results.get(2).intValue());
552
553 currentBlock.unblock();
554 cst.interruptIfNecessary();
555 }
556
557 private static StoreFile createFile() throws Exception {
558 StoreFile sf = mock(StoreFile.class);
559 when(sf.getPath()).thenReturn(new Path("file"));
560 StoreFile.Reader r = mock(StoreFile.Reader.class);
561 when(r.length()).thenReturn(10L);
562 when(sf.getReader()).thenReturn(r);
563 return sf;
564 }
565
566
567
568
569 public static class TrackableCompactionRequest extends CompactionRequest {
570 private CountDownLatch done;
571
572
573
574
575
576 public TrackableCompactionRequest(CountDownLatch finished) {
577 super();
578 this.done = finished;
579 }
580
581 @Override
582 public void afterExecute() {
583 super.afterExecute();
584 this.done.countDown();
585 }
586 }
587 }