View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.*;
22  
23  import java.io.IOException;
24  import java.lang.reflect.Method;
25  import java.net.BindException;
26  import java.util.Comparator;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.*;
41  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
42  import org.apache.hadoop.hbase.testclassification.LargeTests;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.FSUtils;
45  import org.apache.hadoop.hbase.util.Threads;
46  import org.apache.hadoop.hbase.Coprocessor;
47  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
48  import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
49  import org.apache.hadoop.hdfs.DistributedFileSystem;
50  import org.apache.hadoop.hdfs.MiniDFSCluster;
51  import org.apache.hadoop.hdfs.protocol.FSConstants;
52  import org.junit.After;
53  import org.junit.AfterClass;
54  import org.junit.Assert;
55  import org.junit.Before;
56  import org.junit.BeforeClass;
57  import org.junit.Test;
58  import org.junit.experimental.categories.Category;
59  
60  /** JUnit test case for HLog */
61  @Category(LargeTests.class)
62  @SuppressWarnings("deprecation")
63  public class TestHLog  {
64    private static final Log LOG = LogFactory.getLog(TestHLog.class);
65    {
66      // Uncomment the following lines if more verbosity is needed for
67      // debugging (see HBASE-12285 for details).
68      //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
69      //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
70      //((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
71      //    .getLogger().setLevel(Level.ALL);
72      //((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
73      //((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
74    }
75  
76    private static Configuration conf;
77    private static FileSystem fs;
78    private static Path dir;
79    private static MiniDFSCluster cluster;
80    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
81    private static Path hbaseDir;
82    private static Path oldLogDir;
83  
84    @Before
85    public void setUp() throws Exception {
86  
87      FileStatus[] entries = fs.listStatus(new Path("/"));
88      for (FileStatus dir : entries) {
89        fs.delete(dir.getPath(), true);
90      }
91  
92    }
93  
94    @After
95    public void tearDown() throws Exception {
96    }
97  
98    @BeforeClass
99    public static void setUpBeforeClass() throws Exception {
100     // Make block sizes small.
101     TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
102     // needed for testAppendClose()
103     TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
104     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
105     // quicker heartbeat interval for faster DN death notification
106     TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
107     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
108     TEST_UTIL.getConfiguration().setInt("dfs.socket.timeout", 5000);
109     // faster failover with cluster.shutdown();fs.close() idiom
110     TEST_UTIL.getConfiguration().setInt("ipc.client.connect.max.retries", 1);
111     TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
112     TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
113     TEST_UTIL.getConfiguration().setInt("ipc.client.connection.maxidletime", 500);
114     TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
115     TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
116         SampleRegionWALObserver.class.getName());
117     TEST_UTIL.startMiniDFSCluster(3);
118 
119     conf = TEST_UTIL.getConfiguration();
120     cluster = TEST_UTIL.getDFSCluster();
121     fs = cluster.getFileSystem();
122 
123     hbaseDir = TEST_UTIL.createRootDir();
124     oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
125     dir = new Path(hbaseDir, getName());
126   }
127   @AfterClass
128   public static void tearDownAfterClass() throws Exception {
129     TEST_UTIL.shutdownMiniCluster();
130   }
131 
132   private static String getName() {
133     // TODO Auto-generated method stub
134     return "TestHLog";
135   }
136 
137   /**
138    * Write to a log file with three concurrent threads and verifying all data is written.
139    * @throws Exception
140    */
141   @Test
142   public void testConcurrentWrites() throws Exception {
143     // Run the HPE tool with three threads writing 3000 edits each concurrently.
144     // When done, verify that all edits were written.
145     int errCode = HLogPerformanceEvaluation.
146       innerMain(new Configuration(TEST_UTIL.getConfiguration()),
147         new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
148     assertEquals(0, errCode);
149   }
150 
151   /**
152    * Just write multiple logs then split.  Before fix for HADOOP-2283, this
153    * would fail.
154    * @throws IOException
155    */
156   @Test
157   public void testSplit() throws IOException {
158 
159     final TableName tableName =
160         TableName.valueOf(getName());
161     final byte [] rowName = tableName.getName();
162     Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
163     HLog log = HLogFactory.createHLog(fs, hbaseDir,
164         HConstants.HREGION_LOGDIR_NAME, conf);
165     final int howmany = 3;
166     HRegionInfo[] infos = new HRegionInfo[3];
167     Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
168     fs.mkdirs(tabledir);
169     for(int i = 0; i < howmany; i++) {
170       infos[i] = new HRegionInfo(tableName,
171                 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
172       fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
173       LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
174     }
175     HTableDescriptor htd = new HTableDescriptor(tableName);
176     htd.addFamily(new HColumnDescriptor("column"));
177 
178     // Add edits for three regions.
179     final AtomicLong sequenceId = new AtomicLong(1);
180     try {
181       for (int ii = 0; ii < howmany; ii++) {
182         for (int i = 0; i < howmany; i++) {
183 
184           for (int j = 0; j < howmany; j++) {
185             WALEdit edit = new WALEdit();
186             byte [] family = Bytes.toBytes("column");
187             byte [] qualifier = Bytes.toBytes(Integer.toString(j));
188             byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
189             edit.add(new KeyValue(rowName, family, qualifier,
190                 System.currentTimeMillis(), column));
191             LOG.info("Region " + i + ": " + edit);
192             log.append(infos[i], tableName, edit,
193               System.currentTimeMillis(), htd, sequenceId);
194           }
195         }
196         log.rollWriter();
197       }
198       log.close();
199       List<Path> splits = HLogSplitter.split(
200         hbaseDir, logdir, oldLogDir, fs, conf);
201       verifySplits(splits, howmany);
202       log = null;
203     } finally {
204       if (log != null) {
205         log.closeAndDelete();
206       }
207     }
208   }
209 
210   /**
211    * Test new HDFS-265 sync.
212    * @throws Exception
213    */
214   @Test
215   public void Broken_testSync() throws Exception {
216     TableName tableName =
217         TableName.valueOf(getName());
218     // First verify that using streams all works.
219     Path p = new Path(dir, getName() + ".fsdos");
220     FSDataOutputStream out = fs.create(p);
221     out.write(tableName.getName());
222     Method syncMethod = null;
223     try {
224       syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
225     } catch (NoSuchMethodException e) {
226       try {
227         syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
228       } catch (NoSuchMethodException ex) {
229         fail("This version of Hadoop supports neither Syncable.sync() " +
230             "nor Syncable.hflush().");
231       }
232     }
233     syncMethod.invoke(out, new Object[]{});
234     FSDataInputStream in = fs.open(p);
235     assertTrue(in.available() > 0);
236     byte [] buffer = new byte [1024];
237     int read = in.read(buffer);
238     assertEquals(tableName.getName().length, read);
239     out.close();
240     in.close();
241 
242     HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf);
243     final AtomicLong sequenceId = new AtomicLong(1);
244     final int total = 20;
245     HLog.Reader reader = null;
246 
247     try {
248       HRegionInfo info = new HRegionInfo(tableName,
249                   null,null, false);
250       HTableDescriptor htd = new HTableDescriptor();
251       htd.addFamily(new HColumnDescriptor(tableName.getName()));
252 
253       for (int i = 0; i < total; i++) {
254         WALEdit kvs = new WALEdit();
255         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
256         wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
257       }
258       // Now call sync and try reading.  Opening a Reader before you sync just
259       // gives you EOFE.
260       wal.sync();
261       // Open a Reader.
262       Path walPath = ((FSHLog) wal).computeFilename();
263       reader = HLogFactory.createReader(fs, walPath, conf);
264       int count = 0;
265       HLog.Entry entry = new HLog.Entry();
266       while ((entry = reader.next(entry)) != null) count++;
267       assertEquals(total, count);
268       reader.close();
269       // Add test that checks to see that an open of a Reader works on a file
270       // that has had a sync done on it.
271       for (int i = 0; i < total; i++) {
272         WALEdit kvs = new WALEdit();
273         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
274         wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
275       }
276       reader = HLogFactory.createReader(fs, walPath, conf);
277       count = 0;
278       while((entry = reader.next(entry)) != null) count++;
279       assertTrue(count >= total);
280       reader.close();
281       // If I sync, should see double the edits.
282       wal.sync();
283       reader = HLogFactory.createReader(fs, walPath, conf);
284       count = 0;
285       while((entry = reader.next(entry)) != null) count++;
286       assertEquals(total * 2, count);
287       // Now do a test that ensures stuff works when we go over block boundary,
288       // especially that we return good length on file.
289       final byte [] value = new byte[1025 * 1024];  // Make a 1M value.
290       for (int i = 0; i < total; i++) {
291         WALEdit kvs = new WALEdit();
292         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
293         wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
294       }
295       // Now I should have written out lots of blocks.  Sync then read.
296       wal.sync();
297       reader = HLogFactory.createReader(fs, walPath, conf);
298       count = 0;
299       while((entry = reader.next(entry)) != null) count++;
300       assertEquals(total * 3, count);
301       reader.close();
302       // Close it and ensure that closed, Reader gets right length also.
303       wal.close();
304       reader = HLogFactory.createReader(fs, walPath, conf);
305       count = 0;
306       while((entry = reader.next(entry)) != null) count++;
307       assertEquals(total * 3, count);
308       reader.close();
309     } finally {
310       if (wal != null) wal.closeAndDelete();
311       if (reader != null) reader.close();
312     }
313   }
314 
315   private void verifySplits(List<Path> splits, final int howmany)
316   throws IOException {
317     assertEquals(howmany * howmany, splits.size());
318     for (int i = 0; i < splits.size(); i++) {
319       LOG.info("Verifying=" + splits.get(i));
320       HLog.Reader reader = HLogFactory.createReader(fs, splits.get(i), conf);
321       try {
322         int count = 0;
323         String previousRegion = null;
324         long seqno = -1;
325         HLog.Entry entry = new HLog.Entry();
326         while((entry = reader.next(entry)) != null) {
327           HLogKey key = entry.getKey();
328           String region = Bytes.toString(key.getEncodedRegionName());
329           // Assert that all edits are for same region.
330           if (previousRegion != null) {
331             assertEquals(previousRegion, region);
332           }
333           LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
334           assertTrue(seqno < key.getLogSeqNum());
335           seqno = key.getLogSeqNum();
336           previousRegion = region;
337           count++;
338         }
339         assertEquals(howmany, count);
340       } finally {
341         reader.close();
342       }
343     }
344   }
345 
346   /*
347    * We pass different values to recoverFileLease() so that different code paths are covered
348    *
349    * For this test to pass, requires:
350    * 1. HDFS-200 (append support)
351    * 2. HDFS-988 (SafeMode should freeze file operations
352    *              [FSNamesystem.nextGenerationStampForBlock])
353    * 3. HDFS-142 (on restart, maintain pendingCreates)
354    */
355   @Test (timeout=300000)
356   public void testAppendClose() throws Exception {
357     TableName tableName =
358         TableName.valueOf(getName());
359     HRegionInfo regioninfo = new HRegionInfo(tableName,
360              HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
361 
362     HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir",
363         "hlogdir_archive", conf);
364     final AtomicLong sequenceId = new AtomicLong(1);
365     final int total = 20;
366 
367     HTableDescriptor htd = new HTableDescriptor();
368     htd.addFamily(new HColumnDescriptor(tableName.getName()));
369 
370     for (int i = 0; i < total; i++) {
371       WALEdit kvs = new WALEdit();
372       kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
373       wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
374     }
375     // Now call sync to send the data to HDFS datanodes
376     wal.sync();
377      int namenodePort = cluster.getNameNodePort();
378     final Path walPath = ((FSHLog) wal).computeFilename();
379 
380 
381     // Stop the cluster.  (ensure restart since we're sharing MiniDFSCluster)
382     try {
383       DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
384       dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
385       TEST_UTIL.shutdownMiniDFSCluster();
386       try {
387         // wal.writer.close() will throw an exception,
388         // but still call this since it closes the LogSyncer thread first
389         wal.close();
390       } catch (IOException e) {
391         LOG.info(e);
392       }
393       fs.close(); // closing FS last so DFSOutputStream can't call close
394       LOG.info("STOPPED first instance of the cluster");
395     } finally {
396       // Restart the cluster
397       while (cluster.isClusterUp()){
398         LOG.error("Waiting for cluster to go down");
399         Thread.sleep(1000);
400       }
401       assertFalse(cluster.isClusterUp());
402       cluster = null;
403       for (int i = 0; i < 100; i++) {
404         try {
405           cluster = TEST_UTIL.startMiniDFSClusterForTestHLog(namenodePort);
406           break;
407         } catch (BindException e) {
408           LOG.info("Sleeping.  BindException bringing up new cluster");
409           Threads.sleep(1000);
410         }
411       }
412       cluster.waitActive();
413       fs = cluster.getFileSystem();
414       LOG.info("STARTED second instance.");
415     }
416 
417     // set the lease period to be 1 second so that the
418     // namenode triggers lease recovery upon append request
419     Method setLeasePeriod = cluster.getClass()
420       .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
421     setLeasePeriod.setAccessible(true);
422     setLeasePeriod.invoke(cluster, 1000L, 1000L);
423     try {
424       Thread.sleep(1000);
425     } catch (InterruptedException e) {
426       LOG.info(e);
427     }
428 
429     // Now try recovering the log, like the HMaster would do
430     final FileSystem recoveredFs = fs;
431     final Configuration rlConf = conf;
432 
433     class RecoverLogThread extends Thread {
434       public Exception exception = null;
435       public void run() {
436           try {
437             FSUtils.getInstance(fs, rlConf)
438               .recoverFileLease(recoveredFs, walPath, rlConf, null);
439           } catch (IOException e) {
440             exception = e;
441           }
442       }
443     }
444 
445     RecoverLogThread t = new RecoverLogThread();
446     t.start();
447     // Timeout after 60 sec. Without correct patches, would be an infinite loop
448     t.join(60 * 1000);
449     if(t.isAlive()) {
450       t.interrupt();
451       throw new Exception("Timed out waiting for HLog.recoverLog()");
452     }
453 
454     if (t.exception != null)
455       throw t.exception;
456 
457     // Make sure you can read all the content
458     HLog.Reader reader = HLogFactory.createReader(fs, walPath, conf);
459     int count = 0;
460     HLog.Entry entry = new HLog.Entry();
461     while (reader.next(entry) != null) {
462       count++;
463       assertTrue("Should be one KeyValue per WALEdit",
464                   entry.getEdit().getKeyValues().size() == 1);
465     }
466     assertEquals(total, count);
467     reader.close();
468 
469     // Reset the lease period
470     setLeasePeriod.invoke(cluster, new Object[]{new Long(60000), new Long(3600000)});
471   }
472 
473   /**
474    * Tests that we can write out an edit, close, and then read it back in again.
475    * @throws IOException
476    */
477   @Test
478   public void testEditAdd() throws IOException {
479     final int COL_COUNT = 10;
480     final TableName tableName =
481         TableName.valueOf("tablename");
482     final byte [] row = Bytes.toBytes("row");
483     HLog.Reader reader = null;
484     HLog log = null;
485     try {
486       log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
487       final AtomicLong sequenceId = new AtomicLong(1);
488 
489       // Write columns named 1, 2, 3, etc. and then values of single byte
490       // 1, 2, 3...
491       long timestamp = System.currentTimeMillis();
492       WALEdit cols = new WALEdit();
493       for (int i = 0; i < COL_COUNT; i++) {
494         cols.add(new KeyValue(row, Bytes.toBytes("column"),
495             Bytes.toBytes(Integer.toString(i)),
496           timestamp, new byte[] { (byte)(i + '0') }));
497       }
498       HRegionInfo info = new HRegionInfo(tableName,
499         row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
500       HTableDescriptor htd = new HTableDescriptor();
501       htd.addFamily(new HColumnDescriptor("column"));
502 
503       log.append(info, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
504       log.startCacheFlush(info.getEncodedNameAsBytes());
505       log.completeCacheFlush(info.getEncodedNameAsBytes());
506       log.close();
507       Path filename = ((FSHLog) log).computeFilename();
508       log = null;
509       // Now open a reader on the log and assert append worked.
510       reader = HLogFactory.createReader(fs, filename, conf);
511       // Above we added all columns on a single row so we only read one
512       // entry in the below... thats why we have '1'.
513       for (int i = 0; i < 1; i++) {
514         HLog.Entry entry = reader.next(null);
515         if (entry == null) break;
516         HLogKey key = entry.getKey();
517         WALEdit val = entry.getEdit();
518         assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
519         assertTrue(tableName.equals(key.getTablename()));
520         KeyValue kv = val.getKeyValues().get(0);
521         assertTrue(Bytes.equals(row, kv.getRow()));
522         assertEquals((byte)(i + '0'), kv.getValue()[0]);
523         System.out.println(key + " " + val);
524       }
525     } finally {
526       if (log != null) {
527         log.closeAndDelete();
528       }
529       if (reader != null) {
530         reader.close();
531       }
532     }
533   }
534 
535   /**
536    * @throws IOException
537    */
538   @Test
539   public void testAppend() throws IOException {
540     final int COL_COUNT = 10;
541     final TableName tableName =
542         TableName.valueOf("tablename");
543     final byte [] row = Bytes.toBytes("row");
544     Reader reader = null;
545     HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
546     final AtomicLong sequenceId = new AtomicLong(1);
547     try {
548       // Write columns named 1, 2, 3, etc. and then values of single byte
549       // 1, 2, 3...
550       long timestamp = System.currentTimeMillis();
551       WALEdit cols = new WALEdit();
552       for (int i = 0; i < COL_COUNT; i++) {
553         cols.add(new KeyValue(row, Bytes.toBytes("column"),
554           Bytes.toBytes(Integer.toString(i)),
555           timestamp, new byte[] { (byte)(i + '0') }));
556       }
557       HRegionInfo hri = new HRegionInfo(tableName,
558           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
559       HTableDescriptor htd = new HTableDescriptor();
560       htd.addFamily(new HColumnDescriptor("column"));
561       log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
562       log.startCacheFlush(hri.getEncodedNameAsBytes());
563       log.completeCacheFlush(hri.getEncodedNameAsBytes());
564       log.close();
565       Path filename = ((FSHLog) log).computeFilename();
566       log = null;
567       // Now open a reader on the log and assert append worked.
568       reader = HLogFactory.createReader(fs, filename, conf);
569       HLog.Entry entry = reader.next();
570       assertEquals(COL_COUNT, entry.getEdit().size());
571       int idx = 0;
572       for (KeyValue val : entry.getEdit().getKeyValues()) {
573         assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
574           entry.getKey().getEncodedRegionName()));
575         assertTrue(tableName.equals(entry.getKey().getTablename()));
576         assertTrue(Bytes.equals(row, val.getRow()));
577         assertEquals((byte)(idx + '0'), val.getValue()[0]);
578         System.out.println(entry.getKey() + " " + val);
579         idx++;
580       }
581     } finally {
582       if (log != null) {
583         log.closeAndDelete();
584       }
585       if (reader != null) {
586         reader.close();
587       }
588     }
589   }
590 
591   /**
592    * Test that we can visit entries before they are appended
593    * @throws Exception
594    */
595   @Test
596   public void testVisitors() throws Exception {
597     final int COL_COUNT = 10;
598     final TableName tableName =
599         TableName.valueOf("tablename");
600     final byte [] row = Bytes.toBytes("row");
601     HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
602     final AtomicLong sequenceId = new AtomicLong(1);
603     try {
604       DumbWALActionsListener visitor = new DumbWALActionsListener();
605       log.registerWALActionsListener(visitor);
606       long timestamp = System.currentTimeMillis();
607       HTableDescriptor htd = new HTableDescriptor();
608       htd.addFamily(new HColumnDescriptor("column"));
609 
610       HRegionInfo hri = new HRegionInfo(tableName,
611           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
612       for (int i = 0; i < COL_COUNT; i++) {
613         WALEdit cols = new WALEdit();
614         cols.add(new KeyValue(row, Bytes.toBytes("column"),
615             Bytes.toBytes(Integer.toString(i)),
616             timestamp, new byte[]{(byte) (i + '0')}));
617         log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
618       }
619       assertEquals(COL_COUNT, visitor.increments);
620       log.unregisterWALActionsListener(visitor);
621       WALEdit cols = new WALEdit();
622       cols.add(new KeyValue(row, Bytes.toBytes("column"),
623           Bytes.toBytes(Integer.toString(11)),
624           timestamp, new byte[]{(byte) (11 + '0')}));
625       log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
626       assertEquals(COL_COUNT, visitor.increments);
627     } finally {
628       if (log != null) log.closeAndDelete();
629     }
630   }
631 
632   @Test
633   public void testLogCleaning() throws Exception {
634     LOG.info("testLogCleaning");
635     final TableName tableName =
636         TableName.valueOf("testLogCleaning");
637     final TableName tableName2 =
638         TableName.valueOf("testLogCleaning2");
639 
640     HLog log = HLogFactory.createHLog(fs, hbaseDir,
641         getName(), conf);
642     final AtomicLong sequenceId = new AtomicLong(1);
643     try {
644       HRegionInfo hri = new HRegionInfo(tableName,
645           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
646       HRegionInfo hri2 = new HRegionInfo(tableName2,
647           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
648 
649       // Add a single edit and make sure that rolling won't remove the file
650       // Before HBASE-3198 it used to delete it
651       addEdits(log, hri, tableName, 1, sequenceId);
652       log.rollWriter();
653       assertEquals(1, ((FSHLog) log).getNumRolledLogFiles());
654 
655       // See if there's anything wrong with more than 1 edit
656       addEdits(log, hri, tableName, 2, sequenceId);
657       log.rollWriter();
658       assertEquals(2, ((FSHLog) log).getNumRolledLogFiles());
659 
660       // Now mix edits from 2 regions, still no flushing
661       addEdits(log, hri, tableName, 1, sequenceId);
662       addEdits(log, hri2, tableName2, 1, sequenceId);
663       addEdits(log, hri, tableName, 1, sequenceId);
664       addEdits(log, hri2, tableName2, 1, sequenceId);
665       log.rollWriter();
666       assertEquals(3, ((FSHLog) log).getNumRolledLogFiles());
667 
668       // Flush the first region, we expect to see the first two files getting
669       // archived. We need to append something or writer won't be rolled.
670       addEdits(log, hri2, tableName2, 1, sequenceId);
671       log.startCacheFlush(hri.getEncodedNameAsBytes());
672       log.completeCacheFlush(hri.getEncodedNameAsBytes());
673       log.rollWriter();
674       assertEquals(2, ((FSHLog) log).getNumRolledLogFiles());
675 
676       // Flush the second region, which removes all the remaining output files
677       // since the oldest was completely flushed and the two others only contain
678       // flush information
679       addEdits(log, hri2, tableName2, 1, sequenceId);
680       log.startCacheFlush(hri2.getEncodedNameAsBytes());
681       log.completeCacheFlush(hri2.getEncodedNameAsBytes());
682       log.rollWriter();
683       assertEquals(0, ((FSHLog) log).getNumRolledLogFiles());
684     } finally {
685       if (log != null) log.closeAndDelete();
686     }
687   }
688 
689   @Test
690   public void testFailedToCreateHLogIfParentRenamed() throws IOException {
691     FSHLog log = (FSHLog)HLogFactory.createHLog(
692       fs, hbaseDir, "testFailedToCreateHLogIfParentRenamed", conf);
693     long filenum = System.currentTimeMillis();
694     Path path = log.computeFilename(filenum);
695     HLogFactory.createWALWriter(fs, path, conf);
696     Path parent = path.getParent();
697     path = log.computeFilename(filenum + 1);
698     Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
699     fs.rename(parent, newPath);
700     try {
701       HLogFactory.createWALWriter(fs, path, conf);
702       fail("It should fail to create the new WAL");
703     } catch (IOException ioe) {
704       // expected, good.
705     }
706   }
707 
708   @Test
709   public void testGetServerNameFromHLogDirectoryName() throws IOException {
710     ServerName sn = ServerName.valueOf("hn", 450, 1398);
711     String hl = FSUtils.getRootDir(conf) + "/" + HLogUtil.getHLogDirectoryName(sn.toString());
712 
713     // Must not throw exception
714     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, null));
715     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf,
716         FSUtils.getRootDir(conf).toUri().toString()));
717     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, ""));
718     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, "                  "));
719     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, hl));
720     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, hl + "qdf"));
721     Assert.assertNull(HLogUtil.getServerNameFromHLogDirectoryName(conf, "sfqf" + hl + "qdf"));
722 
723     final String wals = "/WALs/";
724     ServerName parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf,
725       FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
726       "/localhost%2C32984%2C1343316388997.1343316390417");
727     Assert.assertEquals("standard",  sn, parsed);
728 
729     parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf, hl + "/qdf");
730     Assert.assertEquals("subdir", sn, parsed);
731 
732     parsed = HLogUtil.getServerNameFromHLogDirectoryName(conf,
733       FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
734       "-splitting/localhost%3A57020.1340474893931");
735     Assert.assertEquals("split", sn, parsed);
736   }
737 
738   /**
739    * A loaded WAL coprocessor won't break existing HLog test cases.
740    */
741   @Test
742   public void testWALCoprocessorLoaded() throws Exception {
743     // test to see whether the coprocessor is loaded or not.
744     HLog log = HLogFactory.createHLog(fs, hbaseDir,
745         getName(), conf);
746     try {
747       WALCoprocessorHost host = log.getCoprocessorHost();
748       Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
749       assertNotNull(c);
750     } finally {
751       if (log != null) log.closeAndDelete();
752     }
753   }
754 
755   private void addEdits(HLog log, HRegionInfo hri, TableName tableName,
756                         int times, AtomicLong sequenceId) throws IOException {
757     HTableDescriptor htd = new HTableDescriptor();
758     htd.addFamily(new HColumnDescriptor("row"));
759 
760     final byte [] row = Bytes.toBytes("row");
761     for (int i = 0; i < times; i++) {
762       long timestamp = System.currentTimeMillis();
763       WALEdit cols = new WALEdit();
764       cols.add(new KeyValue(row, row, row, timestamp, row));
765       log.append(hri, tableName, cols, timestamp, htd, sequenceId);
766     }
767   }
768 
769 
770   /**
771    * @throws IOException
772    */
773   @Test
774   public void testReadLegacyLog() throws IOException {
775     final int columnCount = 5;
776     final int recordCount = 5;
777     final TableName tableName =
778         TableName.valueOf("tablename");
779     final byte[] row = Bytes.toBytes("row");
780     long timestamp = System.currentTimeMillis();
781     Path path = new Path(dir, "temphlog");
782     SequenceFileLogWriter sflw = null;
783     HLog.Reader reader = null;
784     try {
785       HRegionInfo hri = new HRegionInfo(tableName,
786           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
787       HTableDescriptor htd = new HTableDescriptor(tableName);
788       fs.mkdirs(dir);
789       // Write log in pre-PB format.
790       sflw = new SequenceFileLogWriter();
791       sflw.init(fs, path, conf, false);
792       for (int i = 0; i < recordCount; ++i) {
793         HLogKey key = new HLogKey(
794             hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
795         WALEdit edit = new WALEdit();
796         for (int j = 0; j < columnCount; ++j) {
797           if (i == 0) {
798             htd.addFamily(new HColumnDescriptor("column" + j));
799           }
800           String value = i + "" + j;
801           edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
802         }
803         sflw.append(new HLog.Entry(key, edit));
804       }
805       sflw.sync();
806       sflw.close();
807 
808       // Now read the log using standard means.
809       reader = HLogFactory.createReader(fs, path, conf);
810       assertTrue(reader instanceof SequenceFileLogReader);
811       for (int i = 0; i < recordCount; ++i) {
812         HLog.Entry entry = reader.next();
813         assertNotNull(entry);
814         assertEquals(columnCount, entry.getEdit().size());
815         assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
816         assertEquals(tableName, entry.getKey().getTablename());
817         int idx = 0;
818         for (KeyValue val : entry.getEdit().getKeyValues()) {
819           assertTrue(Bytes.equals(row, val.getRow()));
820           String value = i + "" + idx;
821           assertArrayEquals(Bytes.toBytes(value), val.getValue());
822           idx++;
823         }
824       }
825       HLog.Entry entry = reader.next();
826       assertNull(entry);
827     } finally {
828       if (sflw != null) {
829         sflw.close();
830       }
831       if (reader != null) {
832         reader.close();
833       }
834     }
835   }
836 
837   /**
838    * Reads the WAL with and without WALTrailer.
839    * @throws IOException
840    */
841   @Test
842   public void testWALTrailer() throws IOException {
843     // read With trailer.
844     doRead(true);
845     // read without trailer
846     doRead(false);
847   }
848 
849   /**
850    * Appends entries in the WAL and reads it.
851    * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading
852    *          so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync
853    *          call. This means that reader is not aware of the trailer. In this scenario, if the
854    *          reader tries to read the trailer in its next() call, it returns false from
855    *          ProtoBufLogReader.
856    * @throws IOException
857    */
858   private void doRead(boolean withTrailer) throws IOException {
859     final int columnCount = 5;
860     final int recordCount = 5;
861     final TableName tableName =
862         TableName.valueOf("tablename");
863     final byte[] row = Bytes.toBytes("row");
864     long timestamp = System.currentTimeMillis();
865     Path path = new Path(dir, "temphlog");
866     // delete the log if already exists, for test only
867     fs.delete(path, true);
868     HLog.Writer writer = null;
869     HLog.Reader reader = null;
870     try {
871       HRegionInfo hri = new HRegionInfo(tableName,
872           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
873       HTableDescriptor htd = new HTableDescriptor(tableName);
874       fs.mkdirs(dir);
875       // Write log in pb format.
876       writer = HLogFactory.createWALWriter(fs, path, conf);
877       for (int i = 0; i < recordCount; ++i) {
878         HLogKey key = new HLogKey(
879             hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
880         WALEdit edit = new WALEdit();
881         for (int j = 0; j < columnCount; ++j) {
882           if (i == 0) {
883             htd.addFamily(new HColumnDescriptor("column" + j));
884           }
885           String value = i + "" + j;
886           edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
887         }
888         writer.append(new HLog.Entry(key, edit));
889       }
890       writer.sync();
891       if (withTrailer) writer.close();
892 
893       // Now read the log using standard means.
894       reader = HLogFactory.createReader(fs, path, conf);
895       assertTrue(reader instanceof ProtobufLogReader);
896       if (withTrailer) {
897         assertNotNull(reader.getWALTrailer());
898       } else {
899         assertNull(reader.getWALTrailer());
900       }
901       for (int i = 0; i < recordCount; ++i) {
902         HLog.Entry entry = reader.next();
903         assertNotNull(entry);
904         assertEquals(columnCount, entry.getEdit().size());
905         assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
906         assertEquals(tableName, entry.getKey().getTablename());
907         int idx = 0;
908         for (KeyValue val : entry.getEdit().getKeyValues()) {
909           assertTrue(Bytes.equals(row, val.getRow()));
910           String value = i + "" + idx;
911           assertArrayEquals(Bytes.toBytes(value), val.getValue());
912           idx++;
913         }
914       }
915       HLog.Entry entry = reader.next();
916       assertNull(entry);
917     } finally {
918       if (writer != null) {
919         writer.close();
920       }
921       if (reader != null) {
922         reader.close();
923       }
924     }
925   }
926 
927   /**
928    * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
929    * exception if we do). Comparison is based on the timestamp present in the wal name.
930    * @throws Exception
931    */
932   @Test
933   public void testHLogComparator() throws Exception {
934     HLog hlog1 = null;
935     HLog hlogMeta = null;
936     try {
937       hlog1 = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
938       LOG.debug("Log obtained is: " + hlog1);
939       Comparator<Path> comp = ((FSHLog) hlog1).LOG_NAME_COMPARATOR;
940       Path p1 = ((FSHLog) hlog1).computeFilename(11);
941       Path p2 = ((FSHLog) hlog1).computeFilename(12);
942       // comparing with itself returns 0
943       assertTrue(comp.compare(p1, p1) == 0);
944       // comparing with different filenum.
945       assertTrue(comp.compare(p1, p2) < 0);
946       hlogMeta = HLogFactory.createMetaHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf,
947         null, null);
948       Comparator<Path> compMeta = ((FSHLog) hlogMeta).LOG_NAME_COMPARATOR;
949 
950       Path p1WithMeta = ((FSHLog) hlogMeta).computeFilename(11);
951       Path p2WithMeta = ((FSHLog) hlogMeta).computeFilename(12);
952       assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
953       assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
954       // mixing meta and non-meta logs gives error
955       boolean ex = false;
956       try {
957         comp.compare(p1WithMeta, p2);
958       } catch (Exception e) {
959         ex = true;
960       }
961       assertTrue("Comparator doesn't complain while checking meta log files", ex);
962       boolean exMeta = false;
963       try {
964         compMeta.compare(p1WithMeta, p2);
965       } catch (Exception e) {
966         exMeta = true;
967       }
968       assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
969     } finally {
970       if (hlog1 != null) hlog1.close();
971       if (hlogMeta != null) hlogMeta.close();
972     }
973   }
974 
975   /**
976    * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs
977    * and also don't archive "live logs" (that is, a log with un-flushed entries).
978    * <p>
979    * This is what it does:
980    * It creates two regions, and does a series of inserts along with log rolling.
981    * Whenever a WAL is rolled, FSHLog checks previous wals for archiving. A wal is eligible for
982    * archiving if for all the regions which have entries in that wal file, have flushed - past
983    * their maximum sequence id in that wal file.
984    * <p>
985    * @throws IOException
986    */
987   @Test
988   public void testWALArchiving() throws IOException {
989     LOG.debug("testWALArchiving");
990     TableName table1 = TableName.valueOf("t1");
991     TableName table2 = TableName.valueOf("t2");
992     HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
993     try {
994       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
995       HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
996           HConstants.EMPTY_END_ROW);
997       HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
998           HConstants.EMPTY_END_ROW);
999       // ensure that we don't split the regions.
1000       hri1.setSplit(false);
1001       hri2.setSplit(false);
1002       // variables to mock region sequenceIds.
1003       final AtomicLong sequenceId1 = new AtomicLong(1);
1004       final AtomicLong sequenceId2 = new AtomicLong(1);
1005       // start with the testing logic: insert a waledit, and roll writer
1006       addEdits(hlog, hri1, table1, 1, sequenceId1);
1007       hlog.rollWriter();
1008       // assert that the wal is rolled
1009       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1010       // add edits in the second wal file, and roll writer.
1011       addEdits(hlog, hri1, table1, 1, sequenceId1);
1012       hlog.rollWriter();
1013       // assert that the wal is rolled
1014       assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1015       // add a waledit to table1, and flush the region.
1016       addEdits(hlog, hri1, table1, 3, sequenceId1);
1017       flushRegion(hlog, hri1.getEncodedNameAsBytes());
1018       // roll log; all old logs should be archived.
1019       hlog.rollWriter();
1020       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1021       // add an edit to table2, and roll writer
1022       addEdits(hlog, hri2, table2, 1, sequenceId2);
1023       hlog.rollWriter();
1024       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1025       // add edits for table1, and roll writer
1026       addEdits(hlog, hri1, table1, 2, sequenceId1);
1027       hlog.rollWriter();
1028       assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1029       // add edits for table2, and flush hri1.
1030       addEdits(hlog, hri2, table2, 2, sequenceId2);
1031       flushRegion(hlog, hri1.getEncodedNameAsBytes());
1032       // the log : region-sequenceId map is
1033       // log1: region2 (unflushed)
1034       // log2: region1 (flushed)
1035       // log3: region2 (unflushed)
1036       // roll the writer; log2 should be archived.
1037       hlog.rollWriter();
1038       assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles());
1039       // flush region2, and all logs should be archived.
1040       addEdits(hlog, hri2, table2, 2, sequenceId2);
1041       flushRegion(hlog, hri2.getEncodedNameAsBytes());
1042       hlog.rollWriter();
1043       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1044     } finally {
1045       if (hlog != null) hlog.close();
1046     }
1047   }
1048 
1049   /**
1050    * On rolling a wal after reaching the threshold, {@link HLog#rollWriter()} returns the list of
1051    * regions which should be flushed in order to archive the oldest wal file.
1052    * <p>
1053    * This method tests this behavior by inserting edits and rolling the wal enough times to reach
1054    * the max number of logs threshold. It checks whether we get the "right regions" for flush on
1055    * rolling the wal.
1056    * @throws Exception
1057    */
1058   @Test
1059   public void testFindMemStoresEligibleForFlush() throws Exception {
1060     LOG.debug("testFindMemStoresEligibleForFlush");
1061     Configuration conf1 = HBaseConfiguration.create(conf);
1062     conf1.setInt("hbase.regionserver.maxlogs", 1);
1063     HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), conf1);
1064     TableName t1 = TableName.valueOf("t1");
1065     TableName t2 = TableName.valueOf("t2");
1066     HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1067     HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1068     // variables to mock region sequenceIds
1069     final AtomicLong sequenceId1 = new AtomicLong(1);
1070     final AtomicLong sequenceId2 = new AtomicLong(1);
1071     // add edits and roll the wal
1072     try {
1073       addEdits(hlog, hri1, t1, 2, sequenceId1);
1074       hlog.rollWriter();
1075       // add some more edits and roll the wal. This would reach the log number threshold
1076       addEdits(hlog, hri1, t1, 2, sequenceId1);
1077       hlog.rollWriter();
1078       // with above rollWriter call, the max logs limit is reached.
1079       assertTrue(((FSHLog) hlog).getNumRolledLogFiles() == 2);
1080 
1081       // get the regions to flush; since there is only one region in the oldest wal, it should
1082       // return only one region.
1083       byte[][] regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1084       assertEquals(1, regionsToFlush.length);
1085       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
1086       // insert edits in second region
1087       addEdits(hlog, hri2, t2, 2, sequenceId2);
1088       // get the regions to flush, it should still read region1.
1089       regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1090       assertEquals(regionsToFlush.length, 1);
1091       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
1092       // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
1093       // remain.
1094       flushRegion(hlog, hri1.getEncodedNameAsBytes());
1095       hlog.rollWriter();
1096       // only one wal should remain now (that is for the second region).
1097       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1098       // flush the second region
1099       flushRegion(hlog, hri2.getEncodedNameAsBytes());
1100       hlog.rollWriter(true);
1101       // no wal should remain now.
1102       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1103       // add edits both to region 1 and region 2, and roll.
1104       addEdits(hlog, hri1, t1, 2, sequenceId1);
1105       addEdits(hlog, hri2, t2, 2, sequenceId2);
1106       hlog.rollWriter();
1107       // add edits and roll the writer, to reach the max logs limit.
1108       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1109       addEdits(hlog, hri1, t1, 2, sequenceId1);
1110       hlog.rollWriter();
1111       // it should return two regions to flush, as the oldest wal file has entries
1112       // for both regions.
1113       regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
1114       assertEquals(2, regionsToFlush.length);
1115       // flush both regions
1116       flushRegion(hlog, hri1.getEncodedNameAsBytes());
1117       flushRegion(hlog, hri2.getEncodedNameAsBytes());
1118       hlog.rollWriter(true);
1119       assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles());
1120       // Add an edit to region1, and roll the wal.
1121       addEdits(hlog, hri1, t1, 2, sequenceId1);
1122       // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
1123       hlog.startCacheFlush(hri1.getEncodedNameAsBytes());
1124       hlog.rollWriter();
1125       hlog.completeCacheFlush(hri1.getEncodedNameAsBytes());
1126       assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles());
1127     } finally {
1128       if (hlog != null) hlog.close();
1129     }
1130   }
1131 
1132   /**
1133    * Simulates HLog append ops for a region and tests
1134    * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
1135    * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
1136    * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the
1137    * region should be flushed before archiving this WAL.
1138   */
1139   @Test
1140   public void testAllRegionsFlushed() {
1141     LOG.debug("testAllRegionsFlushed");
1142     Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
1143     Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
1144     Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
1145     // create a table
1146     TableName t1 = TableName.valueOf("t1");
1147     // create a region
1148     HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1149     // variables to mock region sequenceIds
1150     final AtomicLong sequenceId1 = new AtomicLong(1);
1151     // test empty map
1152     assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1153     // add entries in the region
1154     seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
1155     oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
1156     // should say region1 is not flushed.
1157     assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1158     // test with entries in oldestFlushing map.
1159     oldestUnFlushedSeqNo.clear();
1160     oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
1161     assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1162     // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
1163     oldestFlushingSeqNo.clear();
1164     oldestUnFlushedSeqNo.clear();
1165     assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1166     // insert some large values for region1
1167     oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
1168     seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
1169     assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1170 
1171     // tests when oldestUnFlushed/oldestFlushing contains larger value.
1172     // It means region is flushed.
1173     oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
1174     oldestUnFlushedSeqNo.clear();
1175     seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
1176     assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
1177   }
1178 
1179   /**
1180    * helper method to simulate region flush for a WAL.
1181    * @param hlog
1182    * @param regionEncodedName
1183    */
1184   private void flushRegion(HLog hlog, byte[] regionEncodedName) {
1185     hlog.startCacheFlush(regionEncodedName);
1186     hlog.completeCacheFlush(regionEncodedName);
1187   }
1188 
1189   static class DumbWALActionsListener implements WALActionsListener {
1190     int increments = 0;
1191 
1192     @Override
1193     public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
1194                                          WALEdit logEdit) {
1195       increments++;
1196     }
1197 
1198     @Override
1199     public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
1200       //To change body of implemented methods use File | Settings | File Templates.
1201       increments++;
1202     }
1203 
1204     @Override
1205     public void preLogRoll(Path oldFile, Path newFile) {
1206       // TODO Auto-generated method stub
1207     }
1208 
1209     @Override
1210     public void postLogRoll(Path oldFile, Path newFile) {
1211       // TODO Auto-generated method stub
1212     }
1213 
1214     @Override
1215     public void preLogArchive(Path oldFile, Path newFile) {
1216       // TODO Auto-generated method stub
1217     }
1218 
1219     @Override
1220     public void postLogArchive(Path oldFile, Path newFile) {
1221       // TODO Auto-generated method stub
1222     }
1223 
1224     @Override
1225     public void logRollRequested(boolean tooFewReplicas) {
1226       // TODO Auto-generated method stub
1227 
1228     }
1229 
1230     @Override
1231     public void logCloseRequested() {
1232       // not interested
1233     }
1234   }
1235 
1236 }
1237