View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  import static org.mockito.Matchers.any;
28  import static org.mockito.Mockito.doAnswer;
29  import static org.mockito.Mockito.mock;
30  import static org.mockito.Mockito.spy;
31  import static org.mockito.Mockito.when;
32  
33  import java.io.IOException;
34  import java.util.ArrayList;
35  import java.util.Collection;
36  import java.util.Collections;
37  import java.util.List;
38  import java.util.concurrent.CountDownLatch;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FSDataOutputStream;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HBaseTestCase;
49  import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
50  import org.apache.hadoop.hbase.HBaseTestingUtility;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.client.Delete;
54  import org.apache.hadoop.hbase.client.Durability;
55  import org.apache.hadoop.hbase.client.Put;
56  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
57  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
58  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
59  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
60  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
61  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
62  import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
63  import org.apache.hadoop.hbase.regionserver.wal.HLog;
64  import org.apache.hadoop.hbase.testclassification.MediumTests;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.Pair;
67  import org.apache.hadoop.hbase.util.Threads;
68  import org.junit.After;
69  import org.junit.Assume;
70  import org.junit.Before;
71  import org.junit.Rule;
72  import org.junit.Test;
73  import org.junit.experimental.categories.Category;
74  import org.junit.rules.TestName;
75  import org.mockito.Mockito;
76  import org.mockito.invocation.InvocationOnMock;
77  import org.mockito.stubbing.Answer;
78  
79  
80  /**
81   * Test compaction framework and common functions
82   */
83  @Category(MediumTests.class)
84  public class TestCompaction {
85    @Rule public TestName name = new TestName();
86    static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
87    private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
88    protected Configuration conf = UTIL.getConfiguration();
89    
90    private HRegion r = null;
91    private HTableDescriptor htd = null;
92    private static final byte [] COLUMN_FAMILY = fam1;
93    private final byte [] STARTROW = Bytes.toBytes(START_KEY);
94    private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
95    private int compactionThreshold;
96    private byte[] secondRowBytes, thirdRowBytes;
97    private static final long MAX_FILES_TO_COMPACT = 10;
98  
99    /** constructor */
100   public TestCompaction() {
101     super();
102 
103     // Set cache flush size to 1MB
104     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
105     conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
106     conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
107       NoLimitCompactionThroughputController.class.getName());
108     compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
109 
110     secondRowBytes = START_KEY_BYTES.clone();
111     // Increment the least significant character so we get to next row.
112     secondRowBytes[START_KEY_BYTES.length - 1]++;
113     thirdRowBytes = START_KEY_BYTES.clone();
114     thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
115   }
116 
117   @Before
118   public void setUp() throws Exception {
119     this.htd = UTIL.createTableDescriptor(name.getMethodName());
120     this.r = UTIL.createLocalHRegion(htd, null, null);
121   }
122 
123   @After
124   public void tearDown() throws Exception {
125     HLog hlog = r.getLog();
126     this.r.close();
127     hlog.closeAndDelete();
128   }
129 
130   /**
131    * Verify that you can stop a long-running compaction
132    * (used during RS shutdown)
133    * @throws Exception
134    */
135   @Test
136   public void testInterruptCompaction() throws Exception {
137     assertEquals(0, count());
138 
139     // lower the polling interval for this test
140     int origWI = HStore.closeCheckInterval;
141     HStore.closeCheckInterval = 10*1000; // 10 KB
142 
143     try {
144       // Create a couple store files w/ 15KB (over 10KB interval)
145       int jmax = (int) Math.ceil(15.0/compactionThreshold);
146       byte [] pad = new byte[1000]; // 1 KB chunk
147       for (int i = 0; i < compactionThreshold; i++) {
148         HRegionIncommon loader = new HRegionIncommon(r);
149         Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
150         p.setDurability(Durability.SKIP_WAL);
151         for (int j = 0; j < jmax; j++) {
152           p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
153         }
154         HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
155         loader.put(p);
156         loader.flushcache();
157       }
158 
159       HRegion spyR = spy(r);
160       doAnswer(new Answer() {
161         public Object answer(InvocationOnMock invocation) throws Throwable {
162           r.writestate.writesEnabled = false;
163           return invocation.callRealMethod();
164         }
165       }).when(spyR).doRegionCompactionPrep();
166 
167       // force a minor compaction, but not before requesting a stop
168       spyR.compactStores();
169 
170       // ensure that the compaction stopped, all old files are intact,
171       Store s = r.stores.get(COLUMN_FAMILY);
172       assertEquals(compactionThreshold, s.getStorefilesCount());
173       assertTrue(s.getStorefilesSize() > 15*1000);
174       // and no new store files persisted past compactStores()
175       FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
176       assertEquals(0, ls.length);
177 
178     } finally {
179       // don't mess up future tests
180       r.writestate.writesEnabled = true;
181       HStore.closeCheckInterval = origWI;
182 
183       // Delete all Store information once done using
184       for (int i = 0; i < compactionThreshold; i++) {
185         Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
186         byte [][] famAndQf = {COLUMN_FAMILY, null};
187         delete.deleteFamily(famAndQf[0]);
188         r.delete(delete);
189       }
190       r.flushcache();
191 
192       // Multiple versions allowed for an entry, so the delete isn't enough
193       // Lower TTL and expire to ensure that all our entries have been wiped
194       final int ttl = 1000;
195       for (Store hstore: this.r.stores.values()) {
196         HStore store = (HStore)hstore;
197         ScanInfo old = store.getScanInfo();
198         ScanInfo si = new ScanInfo(old.getFamily(),
199             old.getMinVersions(), old.getMaxVersions(), ttl,
200             old.getKeepDeletedCells(), 0, old.getComparator());
201         store.setScanInfo(si);
202       }
203       Thread.sleep(ttl);
204 
205       r.compactStores(true);
206       assertEquals(0, count());
207     }
208   }
209 
210   private int count() throws IOException {
211     int count = 0;
212     for (StoreFile f: this.r.stores.
213         get(COLUMN_FAMILY_TEXT).getStorefiles()) {
214       HFileScanner scanner = f.getReader().getScanner(false, false);
215       if (!scanner.seekTo()) {
216         continue;
217       }
218       do {
219         count++;
220       } while(scanner.next());
221     }
222     return count;
223   }
224 
225   private void createStoreFile(final HRegion region) throws IOException {
226     createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
227   }
228 
229   private void createStoreFile(final HRegion region, String family) throws IOException {
230     HRegionIncommon loader = new HRegionIncommon(region);
231     HBaseTestCase.addContent(loader, family);
232     loader.flushcache();
233   }
234 
235   @Test
236   public void testCompactionWithCorruptResult() throws Exception {
237     int nfiles = 10;
238     for (int i = 0; i < nfiles; i++) {
239       createStoreFile(r);
240     }
241     HStore store = (HStore) r.getStore(COLUMN_FAMILY);
242 
243     Collection<StoreFile> storeFiles = store.getStorefiles();
244     DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
245     tool.compactForTesting(storeFiles, false);
246 
247     // Now lets corrupt the compacted file.
248     FileSystem fs = store.getFileSystem();
249     // default compaction policy created one and only one new compacted file
250     Path dstPath = store.getRegionFileSystem().createTempName();
251     FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, (long)1024, null);
252     stream.writeChars("CORRUPT FILE!!!!");
253     stream.close();
254     Path origPath = store.getRegionFileSystem().commitStoreFile(
255       Bytes.toString(COLUMN_FAMILY), dstPath);
256 
257     try {
258       ((HStore)store).moveFileIntoPlace(origPath);
259     } catch (Exception e) {
260       // The complete compaction should fail and the corrupt file should remain
261       // in the 'tmp' directory;
262       assert (fs.exists(origPath));
263       assert (!fs.exists(dstPath));
264       System.out.println("testCompactionWithCorruptResult Passed");
265       return;
266     }
267     fail("testCompactionWithCorruptResult failed since no exception was" +
268         "thrown while completing a corrupt file");
269   }
270 
271   /**
272    * Create a custom compaction request and be sure that we can track it through the queue, knowing
273    * when the compaction is completed.
274    */
275   @Test
276   public void testTrackingCompactionRequest() throws Exception {
277     // setup a compact/split thread on a mock server
278     HRegionServer mockServer = Mockito.mock(HRegionServer.class);
279     Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
280     CompactSplitThread thread = new CompactSplitThread(mockServer);
281     Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
282 
283     // setup a region/store with some files
284     Store store = r.getStore(COLUMN_FAMILY);
285     createStoreFile(r);
286     for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
287       createStoreFile(r);
288     }
289 
290     CountDownLatch latch = new CountDownLatch(1);
291     TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
292     thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
293     // wait for the latch to complete.
294     latch.await();
295 
296     thread.interruptIfNecessary();
297   }
298 
299   /**
300    * HBASE-7947: Regression test to ensure adding to the correct list in the
301    * {@link CompactSplitThread}
302    * @throws Exception on failure
303    */
304   @Test
305   public void testMultipleCustomCompactionRequests() throws Exception {
306     // setup a compact/split thread on a mock server
307     HRegionServer mockServer = Mockito.mock(HRegionServer.class);
308     Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
309     CompactSplitThread thread = new CompactSplitThread(mockServer);
310     Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
311 
312     // setup a region/store with some files
313     int numStores = r.getStores().size();
314     List<Pair<CompactionRequest, Store>> requests =
315         new ArrayList<Pair<CompactionRequest, Store>>(numStores);
316     CountDownLatch latch = new CountDownLatch(numStores);
317     // create some store files and setup requests for each store on which we want to do a
318     // compaction
319     for (Store store : r.getStores().values()) {
320       createStoreFile(r, store.getColumnFamilyName());
321       createStoreFile(r, store.getColumnFamilyName());
322       createStoreFile(r, store.getColumnFamilyName());
323       requests
324           .add(new Pair<CompactionRequest, Store>(new TrackableCompactionRequest(latch), store));
325     }
326 
327     thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
328       Collections.unmodifiableList(requests));
329 
330     // wait for the latch to complete.
331     latch.await();
332 
333     thread.interruptIfNecessary();
334   }
335 
336   private class StoreMockMaker extends StatefulStoreMockMaker {
337     public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
338     public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();
339     private ArrayList<Integer> results;
340 
341     public StoreMockMaker(ArrayList<Integer> results) {
342       this.results = results;
343     }
344 
345     public class TestCompactionContext extends CompactionContext {
346       private List<StoreFile> selectedFiles;
347       public TestCompactionContext(List<StoreFile> selectedFiles) {
348         super();
349         this.selectedFiles = selectedFiles;
350       }
351 
352       @Override
353       public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
354         return new ArrayList<StoreFile>();
355       }
356 
357       @Override
358       public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
359           boolean mayUseOffPeak, boolean forceMajor) throws IOException {
360         this.request = new CompactionRequest(selectedFiles);
361         this.request.setPriority(getPriority());
362         return true;
363       }
364 
365       @Override
366       public List<Path> compact(CompactionThroughputController throughputController)
367           throws IOException {
368         finishCompaction(this.selectedFiles);
369         return new ArrayList<Path>();
370       }
371     }
372 
373     @Override
374     public synchronized CompactionContext selectCompaction() {
375       CompactionContext ctx = new TestCompactionContext(new ArrayList<StoreFile>(notCompacting));
376       compacting.addAll(notCompacting);
377       notCompacting.clear();
378       try {
379         ctx.select(null, false, false, false);
380       } catch (IOException ex) {
381         fail("Shouldn't happen");
382       }
383       return ctx;
384     }
385 
386     @Override
387     public synchronized void cancelCompaction(Object object) {
388       TestCompactionContext ctx = (TestCompactionContext)object;
389       compacting.removeAll(ctx.selectedFiles);
390       notCompacting.addAll(ctx.selectedFiles);
391     }
392 
393     public synchronized void finishCompaction(List<StoreFile> sfs) {
394       if (sfs.isEmpty()) return;
395       synchronized (results) {
396         results.add(sfs.size());
397       }
398       compacting.removeAll(sfs);
399     }
400 
401     @Override
402     public int getPriority() {
403       return 7 - compacting.size() - notCompacting.size();
404     }
405   }
406 
407   public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
408     BlockingCompactionContext blocked = null;
409 
410     public class BlockingCompactionContext extends CompactionContext {
411       public volatile boolean isInCompact = false;
412 
413       public void unblock() {
414         synchronized (this) { this.notifyAll(); }
415       }
416 
417       @Override
418       public List<Path> compact(CompactionThroughputController throughputController)
419           throws IOException {
420         try {
421           isInCompact = true;
422           synchronized (this) {
423             this.wait();
424           }
425         } catch (InterruptedException e) {
426           Assume.assumeNoException(e);
427         }
428         return new ArrayList<Path>();
429       }
430 
431       @Override
432       public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
433         return new ArrayList<StoreFile>();
434       }
435 
436       @Override
437       public boolean select(List<StoreFile> f, boolean i, boolean m, boolean e)
438           throws IOException {
439         this.request = new CompactionRequest(new ArrayList<StoreFile>());
440         return true;
441       }
442     }
443 
444     @Override
445     public CompactionContext selectCompaction() {
446       this.blocked = new BlockingCompactionContext();
447       try {
448         this.blocked.select(null, false, false, false);
449       } catch (IOException ex) {
450         fail("Shouldn't happen");
451       }
452       return this.blocked;
453     }
454 
455     @Override
456     public void cancelCompaction(Object object) {}
457 
458     public int getPriority() {
459       return Integer.MIN_VALUE; // some invalid value, see createStoreMock
460     }
461 
462     public BlockingCompactionContext waitForBlocking() {
463       while (this.blocked == null || !this.blocked.isInCompact) {
464         Threads.sleepWithoutInterrupt(50);
465       }
466       BlockingCompactionContext ctx = this.blocked;
467       this.blocked = null;
468       return ctx;
469     }
470 
471     @Override
472     public Store createStoreMock(String name) throws Exception {
473       return createStoreMock(Integer.MIN_VALUE, name);
474     }
475 
476     public Store createStoreMock(int priority, String name) throws Exception {
477       // Override the mock to always return the specified priority.
478       Store s = super.createStoreMock(name);
479       when(s.getCompactPriority()).thenReturn(priority);
480       return s;
481     }
482   }
483 
484   /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
485   @Test
486   public void testCompactionQueuePriorities() throws Exception {
487     // Setup a compact/split thread on a mock server.
488     final Configuration conf = HBaseConfiguration.create();
489     HRegionServer mockServer = mock(HRegionServer.class);
490     when(mockServer.isStopped()).thenReturn(false);
491     when(mockServer.getConfiguration()).thenReturn(conf);
492     CompactSplitThread cst = new CompactSplitThread(mockServer);
493     when(mockServer.getCompactSplitThread()).thenReturn(cst);
494 
495     // Set up the region mock that redirects compactions.
496     HRegion r = mock(HRegion.class);
497     when(
498       r.compact(any(CompactionContext.class), any(Store.class),
499         any(CompactionThroughputController.class))).then(new Answer<Boolean>() {
500       public Boolean answer(InvocationOnMock invocation) throws Throwable {
501         ((CompactionContext)invocation.getArguments()[0]).compact(
502           (CompactionThroughputController)invocation.getArguments()[2]);
503         return true;
504       }
505     });
506 
507     // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
508     ArrayList<Integer> results = new ArrayList<Integer>();
509     StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
510     Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
511     BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
512 
513     // First, block the compaction thread so that we could muck with queue.
514     cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
515     BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
516 
517     // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
518     for (int i = 0; i < 4; ++i) {
519       sm.notCompacting.add(createFile());
520     }
521     cst.requestSystemCompaction(r, store, "s1-pri3");
522     for (int i = 0; i < 3; ++i) {
523       sm2.notCompacting.add(createFile());
524     }
525     cst.requestSystemCompaction(r, store2, "s2-pri4");
526     // Now add 2 more files to store1 and queue compaction - pri 1.
527     for (int i = 0; i < 2; ++i) {
528       sm.notCompacting.add(createFile());
529     }
530     cst.requestSystemCompaction(r, store, "s1-pri1");
531     // Finally add blocking compaction with priority 2.
532     cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
533 
534     // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
535     currentBlock.unblock();
536     currentBlock = blocker.waitForBlocking();
537     // Pri1 should have "compacted" all 6 files.
538     assertEquals(1, results.size());
539     assertEquals(6, results.get(0).intValue());
540     // Add 2 files to store 1 (it has 2 files now).
541     for (int i = 0; i < 2; ++i) {
542       sm.notCompacting.add(createFile());
543     }
544     // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
545     // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
546     cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
547     currentBlock.unblock();
548     currentBlock = blocker.waitForBlocking();
549     assertEquals(3, results.size());
550     assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
551     assertEquals(2, results.get(2).intValue());
552 
553     currentBlock.unblock();
554     cst.interruptIfNecessary();
555   }
556 
557   private static StoreFile createFile() throws Exception {
558     StoreFile sf = mock(StoreFile.class);
559     when(sf.getPath()).thenReturn(new Path("file"));
560     StoreFile.Reader r = mock(StoreFile.Reader.class);
561     when(r.length()).thenReturn(10L);
562     when(sf.getReader()).thenReturn(r);
563     return sf;
564   }
565 
566   /**
567    * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.
568    */
569   public static class TrackableCompactionRequest extends CompactionRequest {
570     private CountDownLatch done;
571 
572     /**
573      * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
574      * compaction before being used.
575      */
576     public TrackableCompactionRequest(CountDownLatch finished) {
577       super();
578       this.done = finished;
579     }
580 
581     @Override
582     public void afterExecute() {
583       super.afterExecute();
584       this.done.countDown();
585     }
586   }
587 }