View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.io.EOFException;
27  import java.io.IOException;
28  import java.io.OutputStream;
29  import java.lang.reflect.InvocationTargetException;
30  import java.lang.reflect.Method;
31  import java.util.ArrayList;
32  import java.util.HashSet;
33  import java.util.List;
34  import java.util.Set;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.testclassification.LargeTests;
48  import org.apache.hadoop.hbase.MiniHBaseCluster;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.HBaseAdmin;
52  import org.apache.hadoop.hbase.client.HTable;
53  import org.apache.hadoop.hbase.client.Put;
54  import org.apache.hadoop.hbase.client.Result;
55  import org.apache.hadoop.hbase.client.ResultScanner;
56  import org.apache.hadoop.hbase.client.Scan;
57  import org.apache.hadoop.hbase.fs.HFileSystem;
58  import org.apache.hadoop.hbase.ipc.RpcClient;
59  import org.apache.hadoop.hbase.regionserver.HRegion;
60  import org.apache.hadoop.hbase.regionserver.HRegionServer;
61  import org.apache.hadoop.hbase.regionserver.Store;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.util.FSUtils;
64  import org.apache.hadoop.hbase.util.JVMClusterUtil;
65  import org.apache.hadoop.hbase.util.Threads;
66  import org.apache.hadoop.hdfs.MiniDFSCluster;
67  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
68  import org.apache.hadoop.hdfs.server.datanode.DataNode;
69  
70  import org.junit.After;
71  import org.junit.Assert;
72  import org.junit.Before;
73  import org.junit.BeforeClass;
74  import org.junit.Test;
75  import org.junit.experimental.categories.Category;
76  
77  /**
78   * Test log deletion as logs are rolled.
79   */
80  @Category(LargeTests.class)
81  public class TestLogRolling  {
82    private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
83    private HRegionServer server;
84    private HLog log;
85    private String tableName;
86    private byte[] value;
87    private FileSystem fs;
88    private MiniDFSCluster dfsCluster;
89    private HBaseAdmin admin;
90    private MiniHBaseCluster cluster;
91    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
92  
93    /**
94     * constructor
95     * @throws Exception
96     */
97    public TestLogRolling()  {
98      this.server = null;
99      this.log = null;
100     this.tableName = null;
101 
102     String className = this.getClass().getName();
103     StringBuilder v = new StringBuilder(className);
104     while (v.length() < 1000) {
105       v.append(className);
106     }
107     this.value = Bytes.toBytes(v.toString());
108   }
109 
110   // Need to override this setup so we can edit the config before it gets sent
111   // to the HDFS & HBase cluster startup.
112   @BeforeClass
113   public static void setUpBeforeClass() throws Exception {
114     // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2
115     // profile. See HBASE-9337 for related issues.
116     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
117 
118     /**** configuration for testLogRolling ****/
119     // Force a region split after every 768KB
120     TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
121 
122     // We roll the log after every 32 writes
123     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
124 
125     TEST_UTIL.getConfiguration().setInt(
126         "hbase.regionserver.logroll.errors.tolerated", 2);
127     TEST_UTIL.getConfiguration().setInt(RpcClient.PING_INTERVAL_NAME, 10 * 1000);
128     TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT, 10 * 1000);
129     TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
130 
131     // For less frequently updated regions flush after every 2 flushes
132     TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
133 
134     // We flush the cache after every 8192 bytes
135     TEST_UTIL.getConfiguration().setInt(
136         HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
137 
138     // Increase the amount of time between client retries
139     TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
140 
141     // Reduce thread wake frequency so that other threads can get
142     // a chance to run.
143     TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
144 
145    /**** configuration for testLogRollOnDatanodeDeath ****/
146    // make sure log.hflush() calls syncFs() to open a pipeline
147     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
148    // lower the namenode & datanode heartbeat so the namenode
149    // quickly detects datanode failures
150     TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
151     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
152    // the namenode might still try to choose the recently-dead datanode
153    // for a pipeline, so try to a new pipeline multiple times
154     TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
155     TEST_UTIL.getConfiguration().setInt(
156         "hbase.regionserver.hlog.tolerable.lowreplication", 2);
157     TEST_UTIL.getConfiguration().setInt(
158         "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
159   }
160 
161   @Before
162   public void setUp() throws Exception {
163     TEST_UTIL.startMiniCluster(1, 1, 2);
164 
165     cluster = TEST_UTIL.getHBaseCluster();
166     dfsCluster = TEST_UTIL.getDFSCluster();
167     fs = TEST_UTIL.getTestFileSystem();
168     admin = TEST_UTIL.getHBaseAdmin();
169 
170     // disable region rebalancing (interferes with log watching)
171     cluster.getMaster().balanceSwitch(false);
172   }
173 
174   @After
175   public void tearDown() throws Exception  {
176     TEST_UTIL.shutdownMiniCluster();
177   }
178 
179   private void startAndWriteData() throws IOException, InterruptedException {
180     // When the hbase:meta table can be opened, the region servers are running
181     new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
182     this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
183     this.log = server.getWAL();
184 
185     HTable table = createTestTable(this.tableName);
186 
187     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
188     this.log = server.getWAL();
189     for (int i = 1; i <= 256; i++) {    // 256 writes should cause 8 log rolls
190       doPut(table, i);
191       if (i % 32 == 0) {
192         // After every 32 writes sleep to let the log roller run
193         try {
194           Thread.sleep(2000);
195         } catch (InterruptedException e) {
196           // continue
197         }
198       }
199     }
200   }
201 
202   /**
203    * Tests that logs are deleted
204    * @throws IOException
205    * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
206    */
207   @Test
208   public void testLogRolling() throws Exception {
209     this.tableName = getName();
210       startAndWriteData();
211       LOG.info("after writing there are " + ((FSHLog) log).getNumRolledLogFiles() + " log files");
212 
213       // flush all regions
214 
215       List<HRegion> regions =
216         new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
217       for (HRegion r: regions) {
218         r.flushcache();
219       }
220 
221       // Now roll the log
222       log.rollWriter();
223 
224       int count = ((FSHLog) log).getNumRolledLogFiles();
225       LOG.info("after flushing all regions and rolling logs there are " +
226                                       ((FSHLog) log).getNumRolledLogFiles() + " log files");
227       assertTrue(("actual count: " + count), count <= 2);
228   }
229 
230   private static String getName() {
231     return "TestLogRolling";
232   }
233 
234   void writeData(HTable table, int rownum) throws IOException {
235     doPut(table, rownum);
236 
237     // sleep to let the log roller run (if it needs to)
238     try {
239       Thread.sleep(2000);
240     } catch (InterruptedException e) {
241       // continue
242     }
243   }
244 
245   void validateData(HTable table, int rownum) throws IOException {
246     String row = "row" + String.format("%1$04d", rownum);
247     Get get = new Get(Bytes.toBytes(row));
248     get.addFamily(HConstants.CATALOG_FAMILY);
249     Result result = table.get(get);
250     assertTrue(result.size() == 1);
251     assertTrue(Bytes.equals(value,
252                 result.getValue(HConstants.CATALOG_FAMILY, null)));
253     LOG.info("Validated row " + row);
254   }
255 
256   void batchWriteAndWait(HTable table, int start, boolean expect, int timeout)
257       throws IOException {
258     for (int i = 0; i < 10; i++) {
259       Put put = new Put(Bytes.toBytes("row"
260           + String.format("%1$04d", (start + i))));
261       put.add(HConstants.CATALOG_FAMILY, null, value);
262       table.put(put);
263     }
264     Put tmpPut = new Put(Bytes.toBytes("tmprow"));
265     tmpPut.add(HConstants.CATALOG_FAMILY, null, value);
266     long startTime = System.currentTimeMillis();
267     long remaining = timeout;
268     while (remaining > 0) {
269       if (log.isLowReplicationRollEnabled() == expect) {
270         break;
271       } else {
272         // Trigger calling FSHlog#checkLowReplication()
273         table.put(tmpPut);
274         try {
275           Thread.sleep(200);
276         } catch (InterruptedException e) {
277           // continue
278         }
279         remaining = timeout - (System.currentTimeMillis() - startTime);
280       }
281     }
282   }
283 
284   /**
285    * Give me the HDFS pipeline for this log file
286    */
287   DatanodeInfo[] getPipeline(HLog log) throws IllegalArgumentException,
288       IllegalAccessException, InvocationTargetException {
289     OutputStream stm = ((FSHLog) log).getOutputStream();
290     Method getPipeline = null;
291     for (Method m : stm.getClass().getDeclaredMethods()) {
292       if (m.getName().endsWith("getPipeline")) {
293         getPipeline = m;
294         getPipeline.setAccessible(true);
295         break;
296       }
297     }
298 
299     assertTrue("Need DFSOutputStream.getPipeline() for this test",
300         null != getPipeline);
301     Object repl = getPipeline.invoke(stm, new Object[] {} /* NO_ARGS */);
302     return (DatanodeInfo[]) repl;
303   }
304 
305 
306   /**
307    * Tests that logs are rolled upon detecting datanode death
308    * Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
309    */
310   @Test
311   public void testLogRollOnDatanodeDeath() throws Exception {
312     assertTrue("This test requires HLog file replication set to 2.",
313       fs.getDefaultReplication() == 2);
314     LOG.info("Replication=" + fs.getDefaultReplication());
315 
316     this.server = cluster.getRegionServer(0);
317     this.log = server.getWAL();
318 
319     // Create the test table and open it
320     String tableName = getName();
321     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
322     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
323 
324     admin.createTable(desc);
325     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
326     assertTrue(table.isAutoFlush());
327 
328     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
329     this.log = server.getWAL();
330     final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
331 
332     log.registerWALActionsListener(new WALActionsListener() {
333       @Override
334       public void logRollRequested(boolean lowReplication) {
335         if (lowReplication) {
336           lowReplicationHookCalled.lazySet(true);
337         }
338       }
339 
340       @Override
341       public void preLogRoll(Path oldPath, Path newPath) throws IOException { }
342 
343       @Override
344       public void postLogRoll(Path oldPath, Path newPath) throws IOException { }
345 
346       @Override
347       public void preLogArchive(Path oldPath, Path newPath) throws IOException { }
348 
349       @Override
350       public void postLogArchive(Path oldPath, Path newPath) throws IOException { }
351 
352       @Override
353       public void logCloseRequested() { }
354 
355       @Override
356       public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) { }
357 
358       @Override
359       public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { }
360     });
361 
362     assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
363     // don't run this test without append support (HDFS-200 & HDFS-142)
364     assertTrue("Need append support for this test", FSUtils
365         .isAppendSupported(TEST_UTIL.getConfiguration()));
366 
367     // add up the datanode count, to ensure proper replication when we kill 1
368     // This function is synchronous; when it returns, the dfs cluster is active
369     // We start 3 servers and then stop 2 to avoid a directory naming conflict
370     //  when we stop/start a namenode later, as mentioned in HBASE-5163
371     List<DataNode> existingNodes = dfsCluster.getDataNodes();
372     int numDataNodes = 3;
373     dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true,
374         null, null);
375     List<DataNode> allNodes = dfsCluster.getDataNodes();
376     for (int i = allNodes.size()-1; i >= 0; i--) {
377       if (existingNodes.contains(allNodes.get(i))) {
378         dfsCluster.stopDataNode( i );
379       }
380     }
381 
382     assertTrue("DataNodes " + dfsCluster.getDataNodes().size() +
383         " default replication " + fs.getDefaultReplication(),
384       dfsCluster.getDataNodes().size() >= fs.getDefaultReplication() + 1);
385 
386     writeData(table, 2);
387 
388     long curTime = System.currentTimeMillis();
389     long oldFilenum = ((FSHLog) log).getFilenum();
390     assertTrue("Log should have a timestamp older than now",
391         curTime > oldFilenum && oldFilenum != -1);
392 
393     assertTrue("The log shouldn't have rolled yet",
394       oldFilenum == ((FSHLog) log).getFilenum());
395     final DatanodeInfo[] pipeline = getPipeline(log);
396     assertTrue(pipeline.length == fs.getDefaultReplication());
397 
398     // kill a datanode in the pipeline to force a log roll on the next sync()
399     // This function is synchronous, when it returns the node is killed.
400     assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
401 
402     // this write should succeed, but trigger a log roll
403     writeData(table, 2);
404     long newFilenum = ((FSHLog) log).getFilenum();
405 
406     assertTrue("Missing datanode should've triggered a log roll",
407         newFilenum > oldFilenum && newFilenum > curTime);
408 
409     assertTrue("The log rolling hook should have been called with the low replication flag",
410       lowReplicationHookCalled.get());
411 
412     // write some more log data (this should use a new hdfs_out)
413     writeData(table, 3);
414     assertTrue("The log should not roll again.",
415       ((FSHLog) log).getFilenum() == newFilenum);
416     // kill another datanode in the pipeline, so the replicas will be lower than
417     // the configured value 2.
418     assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
419 
420     batchWriteAndWait(table, 3, false, 14000);
421     assertTrue("LowReplication Roller should've been disabled, current replication="
422             + ((FSHLog) log).getLogReplication(),
423         !log.isLowReplicationRollEnabled());
424 
425     dfsCluster
426         .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
427 
428     // Force roll writer. The new log file will have the default replications,
429     // and the LowReplication Roller will be enabled.
430     log.rollWriter(true);
431     batchWriteAndWait(table, 13, true, 10000);
432     assertTrue("New log file should have the default replication instead of " +
433       ((FSHLog) log).getLogReplication(),
434       ((FSHLog) log).getLogReplication() == fs.getDefaultReplication());
435     assertTrue("LowReplication Roller should've been enabled",
436         log.isLowReplicationRollEnabled());
437   }
438 
439   /**
440    * Test that HLog is rolled when all data nodes in the pipeline have been
441    * restarted.
442    * @throws Exception
443    */
444   @Test
445   public void testLogRollOnPipelineRestart() throws Exception {
446     LOG.info("Starting testLogRollOnPipelineRestart");
447     assertTrue("This test requires HLog file replication.",
448       fs.getDefaultReplication() > 1);
449     LOG.info("Replication=" + fs.getDefaultReplication());
450     // When the hbase:meta table can be opened, the region servers are running
451     new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
452 
453     this.server = cluster.getRegionServer(0);
454     this.log = server.getWAL();
455 
456     // Create the test table and open it
457     String tableName = getName();
458     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
459     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
460 
461     admin.createTable(desc);
462     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
463 
464     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
465     this.log = server.getWAL();
466     final List<Path> paths = new ArrayList<Path>();
467     final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
468     paths.add(((FSHLog) log).computeFilename());
469     log.registerWALActionsListener(new WALActionsListener() {
470       @Override
471       public void preLogRoll(Path oldFile, Path newFile)  {
472         LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);
473         preLogRolledCalled.add(new Integer(1));
474       }
475       @Override
476       public void postLogRoll(Path oldFile, Path newFile) {
477         paths.add(newFile);
478       }
479       @Override
480       public void preLogArchive(Path oldFile, Path newFile) {}
481       @Override
482       public void postLogArchive(Path oldFile, Path newFile) {}
483       @Override
484       public void logRollRequested(boolean tooFewReplicas) {}
485       @Override
486       public void logCloseRequested() {}
487       @Override
488       public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
489           WALEdit logEdit) {}
490       @Override
491       public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
492           WALEdit logEdit) {}
493     });
494 
495     assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
496     // don't run this test without append support (HDFS-200 & HDFS-142)
497     assertTrue("Need append support for this test", FSUtils
498         .isAppendSupported(TEST_UTIL.getConfiguration()));
499 
500     writeData(table, 1002);
501 
502     table.setAutoFlush(true, true);
503 
504     long curTime = System.currentTimeMillis();
505     long oldFilenum = log.getFilenum();
506     assertTrue("Log should have a timestamp older than now",
507         curTime > oldFilenum && oldFilenum != -1);
508 
509     assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
510 
511     // roll all datanodes in the pipeline
512     dfsCluster.restartDataNodes();
513     Thread.sleep(1000);
514     dfsCluster.waitActive();
515     LOG.info("Data Nodes restarted");
516     validateData(table, 1002);
517 
518     // this write should succeed, but trigger a log roll
519     writeData(table, 1003);
520     long newFilenum = log.getFilenum();
521 
522     assertTrue("Missing datanode should've triggered a log roll",
523         newFilenum > oldFilenum && newFilenum > curTime);
524     validateData(table, 1003);
525 
526     writeData(table, 1004);
527 
528     // roll all datanode again
529     dfsCluster.restartDataNodes();
530     Thread.sleep(1000);
531     dfsCluster.waitActive();
532     LOG.info("Data Nodes restarted");
533     validateData(table, 1004);
534 
535     // this write should succeed, but trigger a log roll
536     writeData(table, 1005);
537 
538     // force a log roll to read back and verify previously written logs
539     log.rollWriter(true);
540     assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
541         preLogRolledCalled.size() >= 1);
542 
543     // read back the data written
544     Set<String> loggedRows = new HashSet<String>();
545     FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
546     for (Path p : paths) {
547       LOG.debug("recovering lease for " + p);
548       fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null);
549 
550       LOG.debug("Reading HLog "+FSUtils.getPath(p));
551       HLog.Reader reader = null;
552       try {
553         reader = HLogFactory.createReader(fs, p,
554             TEST_UTIL.getConfiguration());
555         HLog.Entry entry;
556         while ((entry = reader.next()) != null) {
557           LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());
558           for (KeyValue kv : entry.getEdit().getKeyValues()) {
559             loggedRows.add(Bytes.toStringBinary(kv.getRow()));
560           }
561         }
562       } catch (EOFException e) {
563         LOG.debug("EOF reading file "+FSUtils.getPath(p));
564       } finally {
565         if (reader != null) reader.close();
566       }
567     }
568 
569     // verify the written rows are there
570     assertTrue(loggedRows.contains("row1002"));
571     assertTrue(loggedRows.contains("row1003"));
572     assertTrue(loggedRows.contains("row1004"));
573     assertTrue(loggedRows.contains("row1005"));
574 
575     // flush all regions
576     List<HRegion> regions =
577         new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
578     for (HRegion r: regions) {
579       r.flushcache();
580     }
581 
582     ResultScanner scanner = table.getScanner(new Scan());
583     try {
584       for (int i=2; i<=5; i++) {
585         Result r = scanner.next();
586         assertNotNull(r);
587         assertFalse(r.isEmpty());
588         assertEquals("row100"+i, Bytes.toString(r.getRow()));
589       }
590     } finally {
591       scanner.close();
592     }
593 
594     // verify that no region servers aborted
595     for (JVMClusterUtil.RegionServerThread rsThread:
596         TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
597       assertFalse(rsThread.getRegionServer().isAborted());
598     }
599   }
600 
601   /**
602    * Tests that logs are deleted when some region has a compaction
603    * record in WAL and no other records. See HBASE-8597.
604    */
605   @Test
606   public void testCompactionRecordDoesntBlockRolling() throws Exception {
607     // When the hbase:meta table can be opened, the region servers are running
608     new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
609 
610     String tableName = getName();
611     HTable table = createTestTable(tableName);
612     String tableName2 = tableName + "1";
613     HTable table2 = createTestTable(tableName2);
614 
615     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
616     this.log = server.getWAL();
617     FSHLog fshLog = (FSHLog)log;
618     HRegion region = server.getOnlineRegions(table2.getName()).get(0);
619     Store s = region.getStore(HConstants.CATALOG_FAMILY);
620 
621     //have to flush namespace to ensure it doesn't affect wall tests
622     admin.flush(TableName.NAMESPACE_TABLE_NAME.getName());
623 
624     // Put some stuff into table2, to make sure we have some files to compact.
625     for (int i = 1; i <= 2; ++i) {
626       doPut(table2, i);
627       admin.flush(table2.getTableName());
628     }
629     doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL
630     assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumRolledLogFiles());
631     assertEquals(2, s.getStorefilesCount());
632 
633     // Roll the log and compact table2, to have compaction record in the 2nd WAL.
634     fshLog.rollWriter();
635     assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
636     admin.flush(table2.getTableName());
637     region.compactStores();
638     // Wait for compaction in case if flush triggered it before us.
639     Assert.assertNotNull(s);
640     for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
641       Threads.sleepWithoutInterrupt(200);
642     }
643     assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
644 
645     // Write some value to the table so the WAL cannot be deleted until table is flushed.
646     doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table.
647     fshLog.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
648     assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
649 
650     // Flush table to make latest WAL obsolete; write another record, and roll again.
651     admin.flush(table.getTableName());
652     doPut(table, 1);
653     fshLog.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
654     assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumRolledLogFiles());
655 
656     table.close();
657     table2.close();
658   }
659 
660   private void doPut(HTable table, int i) throws IOException {
661     Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
662     put.add(HConstants.CATALOG_FAMILY, null, value);
663     table.put(put);
664   }
665 
666   private HTable createTestTable(String tableName) throws IOException {
667     // Create the test table and open it
668     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
669     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
670     admin.createTable(desc);
671     return new HTable(TEST_UTIL.getConfiguration(), tableName);
672   }
673 }
674