View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.FileNotFoundException;
28  import java.io.IOException;
29  import java.lang.reflect.Method;
30  import java.security.PrivilegedExceptionAction;
31  import java.util.ArrayList;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableSet;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.fs.FSDataInputStream;
47  import org.apache.hadoop.fs.FSDataOutputStream;
48  import org.apache.hadoop.fs.FileStatus;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.FileUtil;
51  import org.apache.hadoop.fs.Path;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HBaseTestingUtility;
54  import org.apache.hadoop.hbase.HColumnDescriptor;
55  import org.apache.hadoop.hbase.HConstants;
56  import org.apache.hadoop.hbase.HRegionInfo;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.KeyValue;
59  import org.apache.hadoop.hbase.testclassification.LargeTests;
60  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
61  import org.apache.hadoop.hbase.regionserver.HRegion;
62  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
63  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
64  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException;
65  import org.apache.hadoop.hbase.security.User;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.CancelableProgressable;
68  import org.apache.hadoop.hbase.util.FSUtils;
69  import org.apache.hadoop.hbase.util.Threads;
70  import org.apache.hadoop.hdfs.DFSTestUtil;
71  import org.apache.hadoop.hdfs.DistributedFileSystem;
72  import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
73  import org.apache.hadoop.ipc.RemoteException;
74  import org.junit.After;
75  import org.junit.AfterClass;
76  import org.junit.Assert;
77  import org.junit.Before;
78  import org.junit.BeforeClass;
79  import org.junit.Ignore;
80  import org.junit.Test;
81  import org.junit.experimental.categories.Category;
82  import org.mockito.Mockito;
83  import org.mockito.invocation.InvocationOnMock;
84  import org.mockito.stubbing.Answer;
85  
86  import com.google.common.base.Joiner;
87  import com.google.common.collect.ImmutableList;
88  
89  /**
90   * Testing {@link HLog} splitting code.
91   */
92  @Category(LargeTests.class)
93  public class TestHLogSplit {
94    {
95      // Uncomment the following lines if more verbosity is needed for
96      // debugging (see HBASE-12285 for details).
97      //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
98      //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
99      //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
100   }
101   private final static Log LOG = LogFactory.getLog(TestHLogSplit.class);
102 
103   private Configuration conf;
104   private FileSystem fs;
105 
106   protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
107 
108   private static final Path HBASEDIR = new Path("/hbase");
109   private static final Path HLOGDIR = new Path(HBASEDIR, "hlog");
110   private static final Path OLDLOGDIR = new Path(HBASEDIR, "hlog.old");
111   private static final Path CORRUPTDIR = new Path(HBASEDIR, HConstants.CORRUPT_DIR_NAME);
112 
113   private static final int NUM_WRITERS = 10;
114   private static final int ENTRIES = 10; // entries per writer per region
115 
116   private static final TableName TABLE_NAME =
117       TableName.valueOf("t1");
118   private static final byte[] FAMILY = "f1".getBytes();
119   private static final byte[] QUALIFIER = "q1".getBytes();
120   private static final byte[] VALUE = "v1".getBytes();
121   private static final String HLOG_FILE_PREFIX = "hlog.dat.";
122   private static List<String> REGIONS = new ArrayList<String>();
123   private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
124   private static final Path TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
125   private static String ROBBER;
126   private static String ZOMBIE;
127   private static String [] GROUP = new String [] {"supergroup"};
128   private RecoveryMode mode;
129 
130   static enum Corruptions {
131     INSERT_GARBAGE_ON_FIRST_LINE,
132     INSERT_GARBAGE_IN_THE_MIDDLE,
133     APPEND_GARBAGE,
134     TRUNCATE,
135     TRUNCATE_TRAILER
136   }
137 
138   @BeforeClass
139   public static void setUpBeforeClass() throws Exception {
140     FSUtils.setRootDir(TEST_UTIL.getConfiguration(), HBASEDIR);
141     TEST_UTIL.getConfiguration().setClass("hbase.regionserver.hlog.writer.impl",
142       InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
143     TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
144     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
145     // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
146     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
147     // Create fake maping user to group and set it to the conf.
148     Map<String, String []> u2g_map = new HashMap<String, String []>(2);
149     ROBBER = User.getCurrent().getName() + "-robber";
150     ZOMBIE = User.getCurrent().getName() + "-zombie";
151     u2g_map.put(ROBBER, GROUP);
152     u2g_map.put(ZOMBIE, GROUP);
153     DFSTestUtil.updateConfWithFakeGroupMapping(TEST_UTIL.getConfiguration(), u2g_map);
154     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
155     TEST_UTIL.startMiniDFSCluster(2);
156   }
157 
158   @AfterClass
159   public static void tearDownAfterClass() throws Exception {
160     TEST_UTIL.shutdownMiniDFSCluster();
161   }
162 
163   @Before
164   public void setUp() throws Exception {
165     flushToConsole("Cleaning up cluster for new test\n"
166         + "--------------------------");
167     conf = TEST_UTIL.getConfiguration();
168     fs = TEST_UTIL.getDFSCluster().getFileSystem();
169     FileStatus[] entries = fs.listStatus(new Path("/"));
170     flushToConsole("Num entries in /:" + entries.length);
171     for (FileStatus dir : entries){
172       assertTrue("Deleting " + dir.getPath(), fs.delete(dir.getPath(), true));
173     }
174     // create the HLog directory because recursive log creates are not allowed
175     fs.mkdirs(HLOGDIR);
176     REGIONS.clear();
177     Collections.addAll(REGIONS, "bbb", "ccc");
178     InstrumentedSequenceFileLogWriter.activateFailure = false;
179     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
180         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
181   }
182 
183   @After
184   public void tearDown() throws Exception {
185   }
186 
187   /**
188    * Simulates splitting a WAL out from under a regionserver that is still trying to write it.  Ensures we do not
189    * lose edits.
190    * @throws IOException
191    * @throws InterruptedException
192    */
193   @Test (timeout=300000)
194   public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
195     final AtomicLong counter = new AtomicLong(0);
196     AtomicBoolean stop = new AtomicBoolean(false);
197     // Region we'll write edits too and then later examine to make sure they all made it in.
198     final String region = REGIONS.get(0);
199     Thread zombie = new ZombieLastLogWriterRegionServer(this.conf, counter, stop, region);
200     try {
201       long startCount = counter.get();
202       zombie.start();
203       // Wait till writer starts going.
204       while (startCount == counter.get()) Threads.sleep(1);
205       // Give it a second to write a few appends.
206       Threads.sleep(1000);
207       final Configuration conf2 = HBaseConfiguration.create(this.conf);
208       final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
209       int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
210         @Override
211         public Integer run() throws Exception {
212           FileSystem fs = FileSystem.get(conf2);
213           int expectedFiles = fs.listStatus(HLOGDIR).length;
214           HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf2);
215           Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
216           assertEquals(expectedFiles, logfiles.length);
217           int count = 0;
218           for (Path logfile: logfiles) {
219             count += countHLog(logfile, fs, conf2);
220           }
221           return count;
222         }
223       });
224       LOG.info("zombie=" + counter.get() + ", robber=" + count);
225       assertTrue("The log file could have at most 1 extra log entry, but can't have less. Zombie could write " +
226         counter.get() + " and logfile had only " + count,
227         counter.get() == count || counter.get() + 1 == count);
228     } finally {
229       stop.set(true);
230       zombie.interrupt();
231       Threads.threadDumpingIsAlive(zombie);
232     }
233   }
234 
235   /**
236    * This thread will keep writing to a 'wal' file even after the split process has started.
237    * It simulates a region server that was considered dead but woke up and wrote some more to he last log entry.
238    * Does its writing as an alternate user in another filesystem instance to simulate better it being a regionserver.
239    */
240   static class ZombieLastLogWriterRegionServer extends Thread {
241     final AtomicLong editsCount;
242     final AtomicBoolean stop;
243     // final User user;
244     /**
245      * Region to write edits for.
246      */
247     final String region;
248     final Configuration conf;
249     final User user;
250 
251     public ZombieLastLogWriterRegionServer(final Configuration conf, AtomicLong counter, AtomicBoolean stop,
252         final String region)
253     throws IOException, InterruptedException {
254       super("ZombieLastLogWriterRegionServer");
255       setDaemon(true);
256       this.stop = stop;
257       this.editsCount = counter;
258       this.region = region;
259       this.conf = HBaseConfiguration.create(conf);
260       this.user = User.createUserForTesting(this.conf, ZOMBIE, GROUP);
261     }
262 
263     @Override
264     public void run() {
265       try {
266         doWriting();
267       } catch (IOException e) {
268         LOG.warn(getName() + " Writer exiting " + e);
269       } catch (InterruptedException e) {
270         LOG.warn(getName() + " Writer exiting " + e);
271       }
272     }
273 
274     private void doWriting() throws IOException, InterruptedException {
275       this.user.runAs(new PrivilegedExceptionAction<Object>() {
276         @Override
277         public Object run() throws Exception {
278           // Index of the WAL we want to keep open.  generateHLogs will leave open the WAL whose index we supply here.
279           int walToKeepOpen = 2;
280           // How many files to write.
281           final int numOfWriters = walToKeepOpen + 1;
282           // The below method writes numOfWriters files each with ENTRIES entries for a total of numOfWriters * ENTRIES
283           // added per column family in the region.
284           HLog.Writer[] writers = null;
285           try {
286             DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.get(conf);
287             writers = generateHLogs(dfs, numOfWriters, ENTRIES, walToKeepOpen);
288           } catch (IOException e1) {
289             throw new RuntimeException("Failed", e1);
290           }
291           // Update counter so has all edits written so far.
292           editsCount.addAndGet(numOfWriters * NUM_WRITERS);
293           // This WAL should be open still after our call to generateHLogs -- we asked it leave it open.
294           HLog.Writer writer = writers[walToKeepOpen];
295           loop(writer);
296           return null;
297         }
298       });
299     }
300 
301     private void loop(final HLog.Writer writer) {
302       byte [] regionBytes = Bytes.toBytes(this.region);
303       while (true) {
304         try {
305           long seq = appendEntry(writer, TABLE_NAME, regionBytes, ("r" + editsCount.get()).getBytes(),
306             regionBytes, QUALIFIER, VALUE, 0);
307           long count = editsCount.incrementAndGet();
308           flushToConsole(getName() + " sync count=" + count + ", seq=" + seq);
309           try {
310             Thread.sleep(1);
311           } catch (InterruptedException e) {
312             //
313           }
314         } catch (IOException ex) {
315           flushToConsole(getName() + " ex " + ex.toString());
316           if (ex instanceof RemoteException) {
317             flushToConsole("Juliet: got RemoteException " + ex.getMessage() +
318               " while writing " + (editsCount.get() + 1));
319           } else {
320             flushToConsole(getName() + " failed to write....at " + editsCount.get());
321             assertTrue("Failed to write " + editsCount.get(), false);
322           }
323           break;
324         } catch (Throwable t) {
325           flushToConsole(getName() + " HOW? " + t);
326           t.printStackTrace();
327           break;
328         }
329       }
330       flushToConsole(getName() + " Writer exiting");
331     }
332   }
333 
334   /**
335    * @throws IOException
336    * @see https://issues.apache.org/jira/browse/HBASE-3020
337    */
338   @Test (timeout=300000)
339   public void testRecoveredEditsPathForMeta() throws IOException {
340     FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
341     byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
342     Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
343     Path regiondir = new Path(tdir,
344         HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
345     fs.mkdirs(regiondir);
346     long now = System.currentTimeMillis();
347     HLog.Entry entry =
348         new HLog.Entry(new HLogKey(encoded,
349             TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
350       new WALEdit());
351     Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
352     String parentOfParent = p.getParent().getParent().getName();
353     assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
354   }
355 
356   /**
357    * Test old recovered edits file doesn't break HLogSplitter.
358    * This is useful in upgrading old instances.
359    */
360   @Test (timeout=300000)
361   public void testOldRecoveredEditsFileSidelined() throws IOException {
362     FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
363     byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
364     Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
365     Path regiondir = new Path(tdir,
366         HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
367     fs.mkdirs(regiondir);
368     long now = System.currentTimeMillis();
369     HLog.Entry entry =
370         new HLog.Entry(new HLogKey(encoded,
371             TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
372       new WALEdit());
373     Path parent = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
374     assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR);
375     fs.createNewFile(parent); // create a recovered.edits file
376 
377     Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, true);
378     String parentOfParent = p.getParent().getParent().getName();
379     assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
380     HLogFactory.createRecoveredEditsWriter(fs, p, conf).close();
381   }
382 
383   @Test (timeout=300000)
384   public void testSplitPreservesEdits() throws IOException{
385     final String REGION = "region__1";
386     REGIONS.removeAll(REGIONS);
387     REGIONS.add(REGION);
388 
389     generateHLogs(1, 10, -1);
390     fs.initialize(fs.getUri(), conf);
391     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
392     Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
393     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
394     assertEquals(1, splitLog.length);
395 
396     assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog[0]));
397   }
398 
399 
400   @Test (timeout=300000)
401   public void testEmptyLogFiles() throws IOException {
402 
403     injectEmptyFile(".empty", true);
404     generateHLogs(Integer.MAX_VALUE);
405     injectEmptyFile("empty", true);
406 
407     // make fs act as a different client now
408     // initialize will create a new DFSClient with a new client ID
409     fs.initialize(fs.getUri(), conf);
410 
411     int expectedFiles = fs.listStatus(HLOGDIR).length - 2; // less 2 empty files
412     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
413     for (String region : REGIONS) {
414       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
415       assertEquals(expectedFiles, logfiles.length);
416       int count = 0;
417       for (Path logfile: logfiles) {
418         count += countHLog(logfile, fs, conf);
419       }
420       assertEquals(NUM_WRITERS * ENTRIES, count);
421     }
422   }
423 
424 
425   @Test (timeout=300000)
426   public void testEmptyOpenLogFiles() throws IOException {
427     injectEmptyFile(".empty", false);
428     generateHLogs(Integer.MAX_VALUE);
429     injectEmptyFile("empty", false);
430 
431     // make fs act as a different client now
432     // initialize will create a new DFSClient with a new client ID
433     fs.initialize(fs.getUri(), conf);
434 
435     int expectedFiles = fs.listStatus(HLOGDIR).length - 2 ; // less 2 empty files
436     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
437     for (String region : REGIONS) {
438       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
439       assertEquals(expectedFiles, logfiles.length);
440       int count = 0;
441       for (Path logfile: logfiles) {
442         count += countHLog(logfile, fs, conf);
443       }
444       assertEquals(NUM_WRITERS * ENTRIES, count);
445     }
446   }
447 
448   @Test (timeout=300000)
449   public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
450     // generate logs but leave hlog.dat.5 open.
451     generateHLogs(5);
452 
453     fs.initialize(fs.getUri(), conf);
454 
455     int expectedFiles = fs.listStatus(HLOGDIR).length;
456     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
457     for (String region : REGIONS) {
458       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
459       assertEquals(expectedFiles, logfiles.length);
460       int count = 0;
461       for (Path logfile: logfiles) {
462         count += countHLog(logfile, fs, conf);
463       }
464       assertEquals(NUM_WRITERS * ENTRIES, count);
465     }
466   }
467 
468 
469   @Test (timeout=300000)
470   public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
471     conf.setBoolean(HBASE_SKIP_ERRORS, true);
472     generateHLogs(Integer.MAX_VALUE);
473     corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
474             Corruptions.APPEND_GARBAGE, true, fs);
475     fs.initialize(fs.getUri(), conf);
476 
477     int expectedFiles = fs.listStatus(HLOGDIR).length;
478     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
479     for (String region : REGIONS) {
480       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
481       assertEquals(expectedFiles, logfiles.length);
482       int count = 0;
483       for (Path logfile: logfiles) {
484         count += countHLog(logfile, fs, conf);
485       }
486       assertEquals(NUM_WRITERS * ENTRIES, count);
487     }
488   }
489 
490   @Test (timeout=300000)
491   public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
492     conf.setBoolean(HBASE_SKIP_ERRORS, true);
493     generateHLogs(Integer.MAX_VALUE);
494     corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
495             Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
496     fs.initialize(fs.getUri(), conf);
497 
498     int expectedFiles = fs.listStatus(HLOGDIR).length - 1; // less 1 corrupted file
499     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
500     for (String region : REGIONS) {
501       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
502       assertEquals(expectedFiles, logfiles.length);
503       int count = 0;
504       for (Path logfile: logfiles) {
505         count += countHLog(logfile, fs, conf);
506       }
507       assertEquals((NUM_WRITERS - 1) * ENTRIES, count);
508     }
509   }
510 
511   @Test (timeout=300000)
512   public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
513     conf.setBoolean(HBASE_SKIP_ERRORS, true);
514     generateHLogs(Integer.MAX_VALUE);
515     corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
516             Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
517     fs.initialize(fs.getUri(), conf);
518 
519     int expectedFiles = fs.listStatus(HLOGDIR).length;
520     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
521     for (String region : REGIONS) {
522       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
523       assertEquals(expectedFiles, logfiles.length);
524       int count = 0;
525       for (Path logfile: logfiles) {
526         count += countHLog(logfile, fs, conf);
527       }
528       // the entries in the original logs are alternating regions
529       // considering the sequence file header, the middle corruption should
530       // affect at least half of the entries
531       int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
532       int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
533       assertTrue("The file up to the corrupted area hasn't been parsed",
534               goodEntries + firstHalfEntries <= count);
535     }
536   }
537 
538   @Test (timeout=300000)
539   public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
540     conf.setBoolean(HBASE_SKIP_ERRORS, true);
541     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
542         Reader.class);
543     InstrumentedSequenceFileLogWriter.activateFailure = false;
544     HLogFactory.resetLogReaderClass();
545 
546     try {
547     Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
548       conf.setClass("hbase.regionserver.hlog.reader.impl",
549           FaultySequenceFileLogReader.class, HLog.Reader.class);
550       for (FaultySequenceFileLogReader.FailureType  failureType : FaultySequenceFileLogReader.FailureType.values()) {
551         conf.set("faultysequencefilelogreader.failuretype", failureType.name());
552         generateHLogs(1, ENTRIES, -1);
553         fs.initialize(fs.getUri(), conf);
554         HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
555         FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
556         assertEquals("expected a different file", c1.getName(), archivedLogs[0]
557             .getPath().getName());
558         assertEquals(archivedLogs.length, 1);
559         fs.delete(new Path(OLDLOGDIR, HLOG_FILE_PREFIX + "0"), false);
560       }
561     } finally {
562       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
563           Reader.class);
564       HLogFactory.resetLogReaderClass();
565     }
566   }
567 
568   @Test (timeout=300000, expected = IOException.class)
569   public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
570       throws IOException {
571     conf.setBoolean(HBASE_SKIP_ERRORS, false);
572     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
573         Reader.class);
574     InstrumentedSequenceFileLogWriter.activateFailure = false;
575     HLogFactory.resetLogReaderClass();
576 
577     try {
578       conf.setClass("hbase.regionserver.hlog.reader.impl",
579           FaultySequenceFileLogReader.class, HLog.Reader.class);
580       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
581       generateHLogs(Integer.MAX_VALUE);
582       fs.initialize(fs.getUri(), conf);
583       HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
584     } finally {
585       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
586           Reader.class);
587       HLogFactory.resetLogReaderClass();
588     }
589   }
590 
591   @Test (timeout=300000)
592   public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
593       throws IOException {
594     conf.setBoolean(HBASE_SKIP_ERRORS, false);
595     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
596         Reader.class);
597     InstrumentedSequenceFileLogWriter.activateFailure = false;
598     HLogFactory.resetLogReaderClass();
599 
600     try {
601       conf.setClass("hbase.regionserver.hlog.reader.impl",
602           FaultySequenceFileLogReader.class, HLog.Reader.class);
603       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
604       generateHLogs(-1);
605       fs.initialize(fs.getUri(), conf);
606       try {
607         HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
608       } catch (IOException e) {
609         assertEquals(
610             "if skip.errors is false all files should remain in place",
611             NUM_WRITERS, fs.listStatus(HLOGDIR).length);
612       }
613     } finally {
614       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
615           Reader.class);
616       HLogFactory.resetLogReaderClass();
617     }
618   }
619 
620   @Test (timeout=300000)
621   public void testEOFisIgnored() throws IOException {
622     conf.setBoolean(HBASE_SKIP_ERRORS, false);
623 
624     final String REGION = "region__1";
625     REGIONS.removeAll(REGIONS);
626     REGIONS.add(REGION);
627 
628     int entryCount = 10;
629     Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
630     generateHLogs(1, entryCount, -1);
631     corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
632 
633     fs.initialize(fs.getUri(), conf);
634     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
635 
636     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
637     assertEquals(1, splitLog.length);
638 
639     int actualCount = 0;
640     HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
641     @SuppressWarnings("unused")
642     HLog.Entry entry;
643     while ((entry = in.next()) != null) ++actualCount;
644     assertEquals(entryCount-1, actualCount);
645 
646     // should not have stored the EOF files as corrupt
647     FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
648     assertEquals(archivedLogs.length, 0);
649   }
650 
651   @Test (timeout=300000)
652   public void testCorruptWALTrailer() throws IOException {
653     conf.setBoolean(HBASE_SKIP_ERRORS, false);
654 
655     final String REGION = "region__1";
656     REGIONS.removeAll(REGIONS);
657     REGIONS.add(REGION);
658 
659     int entryCount = 10;
660     Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
661     generateHLogs(1, entryCount, -1);
662     corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs);
663 
664     fs.initialize(fs.getUri(), conf);
665     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
666 
667     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
668     assertEquals(1, splitLog.length);
669 
670     int actualCount = 0;
671     HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
672     @SuppressWarnings("unused")
673     HLog.Entry entry;
674     while ((entry = in.next()) != null) ++actualCount;
675     assertEquals(entryCount, actualCount);
676 
677     // should not have stored the EOF files as corrupt
678     FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
679     assertEquals(archivedLogs.length, 0);
680   }
681 
682   @Test (timeout=300000)
683   public void testLogsGetArchivedAfterSplit() throws IOException {
684     conf.setBoolean(HBASE_SKIP_ERRORS, false);
685     generateHLogs(-1);
686     fs.initialize(fs.getUri(), conf);
687     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
688     FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
689     assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
690   }
691 
692   @Test (timeout=300000)
693   public void testSplit() throws IOException {
694     generateHLogs(-1);
695     fs.initialize(fs.getUri(), conf);
696 
697     int expectedFiles = fs.listStatus(HLOGDIR).length;
698     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
699     for (String region : REGIONS) {
700       Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
701       assertEquals(expectedFiles, logfiles.length);
702       int count = 0;
703       for (Path logfile: logfiles) {
704         count += countHLog(logfile, fs, conf);
705       }
706       assertEquals(NUM_WRITERS * ENTRIES, count);
707     }
708   }
709 
710   @Test (timeout=300000)
711   public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
712   throws IOException {
713     generateHLogs(-1);
714     fs.initialize(fs.getUri(), conf);
715     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
716     FileStatus [] statuses = null;
717     try {
718       statuses = fs.listStatus(HLOGDIR);
719       if (statuses != null) {
720         Assert.fail("Files left in log dir: " +
721             Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
722       }
723     } catch (FileNotFoundException e) {
724       // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
725     }
726   }
727 
728   @Test(timeout=300000, expected = IOException.class)
729   public void testSplitWillFailIfWritingToRegionFails() throws Exception {
730     //leave 5th log open so we could append the "trap"
731     HLog.Writer [] writer = generateHLogs(4);
732 
733     fs.initialize(fs.getUri(), conf);
734 
735     String region = "break";
736     Path regiondir = new Path(TABLEDIR, region);
737     fs.mkdirs(regiondir);
738 
739     InstrumentedSequenceFileLogWriter.activateFailure = false;
740     appendEntry(writer[4], TABLE_NAME, Bytes.toBytes(region),
741         ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
742     writer[4].close();
743 
744     try {
745       InstrumentedSequenceFileLogWriter.activateFailure = true;
746       HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
747     } catch (IOException e) {
748       assertTrue(e.getMessage().
749         contains("This exception is instrumented and should only be thrown for testing"));
750       throw e;
751     } finally {
752       InstrumentedSequenceFileLogWriter.activateFailure = false;
753     }
754   }
755 
756 
757   // @Test TODO this test has been disabled since it was created!
758   // It currently fails because the second split doesn't output anything
759   // -- because there are no region dirs after we move aside the first
760   // split result
761   public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
762 
763     REGIONS.removeAll(REGIONS);
764     for (int i=0; i<100; i++) {
765       REGIONS.add("region__"+i);
766     }
767 
768     generateHLogs(1, 100, -1);
769     fs.initialize(fs.getUri(), conf);
770 
771     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
772     fs.rename(OLDLOGDIR, HLOGDIR);
773     Path firstSplitPath = new Path(HBASEDIR, TABLE_NAME+ ".first");
774     Path splitPath = new Path(HBASEDIR, TABLE_NAME.getNameAsString());
775     fs.rename(splitPath,
776             firstSplitPath);
777 
778     fs.initialize(fs.getUri(), conf);
779     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
780     assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
781   }
782 
783   @Test (timeout=300000)
784   public void testSplitDeletedRegion() throws IOException {
785     REGIONS.removeAll(REGIONS);
786     String region = "region_that_splits";
787     REGIONS.add(region);
788 
789     generateHLogs(1);
790     fs.initialize(fs.getUri(), conf);
791 
792     Path regiondir = new Path(TABLEDIR, region);
793     fs.delete(regiondir, true);
794     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
795     assertFalse(fs.exists(regiondir));
796   }
797 
798   @Test (timeout=300000)
799   public void testIOEOnOutputThread() throws Exception {
800     conf.setBoolean(HBASE_SKIP_ERRORS, false);
801 
802     generateHLogs(-1);
803     fs.initialize(fs.getUri(), conf);
804     FileStatus[] logfiles = fs.listStatus(HLOGDIR);
805     assertTrue("There should be some log file",
806       logfiles != null && logfiles.length > 0);
807     // Set up a splitter that will throw an IOE on the output side
808     HLogSplitter logSplitter = new HLogSplitter(
809         conf, HBASEDIR, fs, null, null, this.mode) {
810       protected HLog.Writer createWriter(FileSystem fs,
811           Path logfile, Configuration conf) throws IOException {
812         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
813         Mockito.doThrow(new IOException("Injected")).when(
814           mockWriter).append(Mockito.<HLog.Entry>any());
815         return mockWriter;
816       }
817     };
818     // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
819     // the thread dumping in a background thread so it does not hold up the test.
820     final AtomicBoolean stop = new AtomicBoolean(false);
821     final Thread someOldThread = new Thread("Some-old-thread") {
822       @Override
823       public void run() {
824         while(!stop.get()) Threads.sleep(10);
825       }
826     };
827     someOldThread.setDaemon(true);
828     someOldThread.start();
829     final Thread t = new Thread("Background-thread-dumper") {
830       public void run() {
831         try {
832           Threads.threadDumpingIsAlive(someOldThread);
833         } catch (InterruptedException e) {
834           e.printStackTrace();
835         }
836       }
837     };
838     t.setDaemon(true);
839     t.start();
840     try {
841       logSplitter.splitLogFile(logfiles[0], null);
842       fail("Didn't throw!");
843     } catch (IOException ioe) {
844       assertTrue(ioe.toString().contains("Injected"));
845     } finally {
846       // Setting this to true will turn off the background thread dumper.
847       stop.set(true);
848     }
849   }
850 
851   // Test for HBASE-3412
852   @Test (timeout=300000)
853   public void testMovedHLogDuringRecovery() throws Exception {
854     generateHLogs(-1);
855 
856     fs.initialize(fs.getUri(), conf);
857 
858     // This partial mock will throw LEE for every file simulating
859     // files that were moved
860     FileSystem spiedFs = Mockito.spy(fs);
861     // The "File does not exist" part is very important,
862     // that's how it comes out of HDFS
863     Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
864         when(spiedFs).append(Mockito.<Path>any());
865 
866     try {
867       HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
868       assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
869       assertFalse(fs.exists(HLOGDIR));
870     } catch (IOException e) {
871       fail("There shouldn't be any exception but: " + e.toString());
872     }
873   }
874 
875   @Test (timeout=300000)
876   public void testRetryOpenDuringRecovery() throws Exception {
877     generateHLogs(-1);
878 
879     fs.initialize(fs.getUri(), conf);
880 
881     FileSystem spiedFs = Mockito.spy(fs);
882     // The "Cannot obtain block length", "Could not obtain the last block",
883     // and "Blocklist for [^ ]* has changed.*" part is very important,
884     // that's how it comes out of HDFS. If HDFS changes the exception
885     // message, this test needs to be adjusted accordingly.
886     //
887     // When DFSClient tries to open a file, HDFS needs to locate
888     // the last block of the file and get its length. However, if the
889     // last block is under recovery, HDFS may have problem to obtain
890     // the block length, in which case, retry may help.
891     Mockito.doAnswer(new Answer<FSDataInputStream>() {
892       private final String[] errors = new String[] {
893         "Cannot obtain block length", "Could not obtain the last block",
894         "Blocklist for " + OLDLOGDIR + " has changed"};
895       private int count = 0;
896 
897       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
898             if (count < 3) {
899                 throw new IOException(errors[count++]);
900             }
901             return (FSDataInputStream)invocation.callRealMethod();
902         }
903     }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
904 
905     try {
906       HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
907       assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
908       assertFalse(fs.exists(HLOGDIR));
909     } catch (IOException e) {
910       fail("There shouldn't be any exception but: " + e.toString());
911     }
912   }
913 
914   @Test (timeout=300000)
915   public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
916     generateHLogs(1, 10, -1);
917     FileStatus logfile = fs.listStatus(HLOGDIR)[0];
918     fs.initialize(fs.getUri(), conf);
919 
920     final AtomicInteger count = new AtomicInteger();
921 
922     CancelableProgressable localReporter
923       = new CancelableProgressable() {
924         @Override
925         public boolean progress() {
926           count.getAndIncrement();
927           return false;
928         }
929       };
930 
931     FileSystem spiedFs = Mockito.spy(fs);
932     Mockito.doAnswer(new Answer<FSDataInputStream>() {
933       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
934         Thread.sleep(1500); // Sleep a while and wait report status invoked
935         return (FSDataInputStream)invocation.callRealMethod();
936       }
937     }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
938 
939     try {
940       conf.setInt("hbase.splitlog.report.period", 1000);
941       boolean ret = HLogSplitter.splitLogFile(
942         HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode);
943       assertFalse("Log splitting should failed", ret);
944       assertTrue(count.get() > 0);
945     } catch (IOException e) {
946       fail("There shouldn't be any exception but: " + e.toString());
947     } finally {
948       // reset it back to its default value
949       conf.setInt("hbase.splitlog.report.period", 59000);
950     }
951   }
952 
953   /**
954    * Test log split process with fake data and lots of edits to trigger threading
955    * issues.
956    */
957   @Test (timeout=300000)
958   public void testThreading() throws Exception {
959     doTestThreading(20000, 128*1024*1024, 0);
960   }
961 
962   /**
963    * Test blocking behavior of the log split process if writers are writing slower
964    * than the reader is reading.
965    */
966   @Test (timeout=300000)
967   public void testThreadingSlowWriterSmallBuffer() throws Exception {
968     doTestThreading(200, 1024, 50);
969   }
970 
971   /**
972    * Sets up a log splitter with a mock reader and writer. The mock reader generates
973    * a specified number of edits spread across 5 regions. The mock writer optionally
974    * sleeps for each edit it is fed.
975    * *
976    * After the split is complete, verifies that the statistics show the correct number
977    * of edits output into each region.
978    *
979    * @param numFakeEdits number of fake edits to push through pipeline
980    * @param bufferSize size of in-memory buffer
981    * @param writerSlowness writer threads will sleep this many ms per edit
982    */
983   private void doTestThreading(final int numFakeEdits,
984       final int bufferSize,
985       final int writerSlowness) throws Exception {
986 
987     Configuration localConf = new Configuration(conf);
988     localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
989 
990     // Create a fake log file (we'll override the reader to produce a stream of edits)
991     Path logPath = new Path(HLOGDIR, HLOG_FILE_PREFIX + ".fake");
992     FSDataOutputStream out = fs.create(logPath);
993     out.close();
994 
995     // Make region dirs for our destination regions so the output doesn't get skipped
996     final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
997     makeRegionDirs(fs, regions);
998 
999     // Create a splitter that reads and writes the data without touching disk
1000     HLogSplitter logSplitter = new HLogSplitter(
1001         localConf, HBASEDIR, fs, null, null, this.mode) {
1002 
1003       /* Produce a mock writer that doesn't write anywhere */
1004       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
1005       throws IOException {
1006         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
1007         Mockito.doAnswer(new Answer<Void>() {
1008           int expectedIndex = 0;
1009 
1010           @Override
1011           public Void answer(InvocationOnMock invocation) {
1012             if (writerSlowness > 0) {
1013               try {
1014                 Thread.sleep(writerSlowness);
1015               } catch (InterruptedException ie) {
1016                 Thread.currentThread().interrupt();
1017               }
1018             }
1019             HLog.Entry entry = (Entry) invocation.getArguments()[0];
1020             WALEdit edit = entry.getEdit();
1021             List<KeyValue> keyValues = edit.getKeyValues();
1022             assertEquals(1, keyValues.size());
1023             KeyValue kv = keyValues.get(0);
1024 
1025             // Check that the edits come in the right order.
1026             assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
1027             expectedIndex++;
1028             return null;
1029           }
1030         }).when(mockWriter).append(Mockito.<HLog.Entry>any());
1031         return mockWriter;
1032       }
1033 
1034       /* Produce a mock reader that generates fake entries */
1035       protected Reader getReader(FileSystem fs, Path curLogFile,
1036           Configuration conf, CancelableProgressable reporter) throws IOException {
1037         Reader mockReader = Mockito.mock(Reader.class);
1038         Mockito.doAnswer(new Answer<HLog.Entry>() {
1039           int index = 0;
1040 
1041           @Override
1042           public HLog.Entry answer(InvocationOnMock invocation) throws Throwable {
1043             if (index >= numFakeEdits) return null;
1044 
1045             // Generate r0 through r4 in round robin fashion
1046             int regionIdx = index % regions.size();
1047             byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1048 
1049             HLog.Entry ret = createTestEntry(TABLE_NAME, region,
1050                 Bytes.toBytes((int)(index / regions.size())),
1051                 FAMILY, QUALIFIER, VALUE, index);
1052             index++;
1053             return ret;
1054           }
1055         }).when(mockReader).next();
1056         return mockReader;
1057       }
1058     };
1059 
1060     logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
1061 
1062     // Verify number of written edits per region
1063     Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1064     for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
1065       LOG.info("Got " + entry.getValue() + " output edits for region " +
1066           Bytes.toString(entry.getKey()));
1067       assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
1068     }
1069     assertEquals(regions.size(), outputCounts.size());
1070   }
1071 
1072   // HBASE-2312: tests the case where a RegionServer enters a GC pause,
1073   // comes back online after the master declared it dead and started to split.
1074   // Want log rolling after a master split to fail
1075   @Test (timeout=300000)
1076   @Ignore("Need HADOOP-6886, HADOOP-6840, & HDFS-617 for this. HDFS 0.20.205.1+ should have this")
1077   public void testLogRollAfterSplitStart() throws IOException {
1078     HLog log = null;
1079     String logName = "testLogRollAfterSplitStart";
1080     Path thisTestsDir = new Path(HBASEDIR, logName);
1081 
1082     try {
1083       // put some entries in an HLog
1084       TableName tableName =
1085           TableName.valueOf(this.getClass().getName());
1086       HRegionInfo regioninfo = new HRegionInfo(tableName,
1087           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
1088       log = HLogFactory.createHLog(fs, HBASEDIR, logName, conf);
1089       final AtomicLong sequenceId = new AtomicLong(1);
1090 
1091       final int total = 20;
1092       for (int i = 0; i < total; i++) {
1093         WALEdit kvs = new WALEdit();
1094         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
1095         HTableDescriptor htd = new HTableDescriptor(tableName);
1096         htd.addFamily(new HColumnDescriptor("column"));
1097         log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
1098       }
1099       // Send the data to HDFS datanodes and close the HDFS writer
1100       log.sync();
1101       ((FSHLog) log).cleanupCurrentWriter(log.getFilenum());
1102 
1103       /* code taken from ProcessServerShutdown.process()
1104        * handles RS shutdowns (as observed by the Master)
1105        */
1106       // rename the directory so a rogue RS doesn't create more HLogs
1107       Path rsSplitDir = new Path(thisTestsDir.getParent(),
1108                                  thisTestsDir.getName() + "-splitting");
1109       fs.rename(thisTestsDir, rsSplitDir);
1110       LOG.debug("Renamed region directory: " + rsSplitDir);
1111 
1112       // Process the old log files
1113       HLogSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf);
1114 
1115       // Now, try to roll the HLog and verify failure
1116       try {
1117         log.rollWriter();
1118         Assert.fail("rollWriter() did not throw any exception.");
1119       } catch (IOException ioe) {
1120         if (ioe.getCause().getMessage().contains("FileNotFound")) {
1121           LOG.info("Got the expected exception: ", ioe.getCause());
1122         } else {
1123           Assert.fail("Unexpected exception: " + ioe);
1124         }
1125       }
1126     } finally {
1127       if (log != null) {
1128         log.close();
1129       }
1130       if (fs.exists(thisTestsDir)) {
1131         fs.delete(thisTestsDir, true);
1132       }
1133     }
1134   }
1135 
1136   /**
1137    * This thread will keep adding new log files
1138    * It simulates a region server that was considered dead but woke up and wrote
1139    * some more to a new hlog
1140    */
1141   class ZombieNewLogWriterRegionServer extends Thread {
1142     AtomicBoolean stop;
1143     CountDownLatch latch;
1144     public ZombieNewLogWriterRegionServer(CountDownLatch latch, AtomicBoolean stop) {
1145       super("ZombieNewLogWriterRegionServer");
1146       this.latch = latch;
1147       this.stop = stop;
1148     }
1149 
1150     @Override
1151     public void run() {
1152       if (stop.get()) {
1153         return;
1154       }
1155       Path tableDir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1156       Path regionDir = new Path(tableDir, REGIONS.get(0));
1157       Path recoveredEdits = new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
1158       String region = "juliet";
1159       Path julietLog = new Path(HLOGDIR, HLOG_FILE_PREFIX + ".juliet");
1160       try {
1161 
1162         while (!fs.exists(recoveredEdits) && !stop.get()) {
1163           LOG.info("Juliet: split not started, sleeping a bit...");
1164           Threads.sleep(10);
1165         }
1166 
1167         fs.mkdirs(new Path(tableDir, region));
1168         HLog.Writer writer = HLogFactory.createWALWriter(fs,
1169           julietLog, conf);
1170         appendEntry(writer, TableName.valueOf("juliet"), ("juliet").getBytes(),
1171             ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
1172         writer.close();
1173         LOG.info("Juliet file creator: created file " + julietLog);
1174         latch.countDown();
1175       } catch (IOException e1) {
1176         LOG.error("Failed to create file " + julietLog, e1);
1177         assertTrue("Failed to create file " + julietLog, false);
1178       }
1179     }
1180   }
1181 
1182   @Test (timeout=300000)
1183   public void testSplitLogFileWithOneRegion() throws IOException {
1184     LOG.info("testSplitLogFileWithOneRegion");
1185     final String REGION = "region__1";
1186     REGIONS.removeAll(REGIONS);
1187     REGIONS.add(REGION);
1188 
1189     generateHLogs(1, 10, -1);
1190     fs.initialize(fs.getUri(), conf);
1191     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1192 
1193     Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
1194     Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
1195     assertEquals(1, splitLog.length);
1196 
1197     assertEquals(true, logsAreEqual(originalLog, splitLog[0]));
1198   }
1199 
1200   @Test (timeout=300000)
1201   public void testSplitLogFileDeletedRegionDir() throws IOException {
1202     LOG.info("testSplitLogFileDeletedRegionDir");
1203     final String REGION = "region__1";
1204     REGIONS.removeAll(REGIONS);
1205     REGIONS.add(REGION);
1206 
1207     generateHLogs(1, 10, -1);
1208     fs.initialize(fs.getUri(), conf);
1209 
1210     Path regiondir = new Path(TABLEDIR, REGION);
1211     LOG.info("Region directory is" + regiondir);
1212     fs.delete(regiondir, true);
1213 
1214     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1215 
1216     assertTrue(!fs.exists(regiondir));
1217     assertTrue(true);
1218   }
1219 
1220   @Test (timeout=300000)
1221   public void testSplitLogFileEmpty() throws IOException {
1222     LOG.info("testSplitLogFileEmpty");
1223     injectEmptyFile(".empty", true);
1224 
1225     fs.initialize(fs.getUri(), conf);
1226 
1227     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1228     Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1229     assertFalse(fs.exists(tdir));
1230 
1231     assertEquals(0, countHLog(fs.listStatus(OLDLOGDIR)[0].getPath(), fs, conf));
1232   }
1233 
1234   @Test (timeout=300000)
1235   public void testSplitLogFileMultipleRegions() throws IOException {
1236     LOG.info("testSplitLogFileMultipleRegions");
1237     generateHLogs(1, 10, -1);
1238     fs.initialize(fs.getUri(), conf);
1239 
1240     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1241     for (String region : REGIONS) {
1242       Path[] recovered = getLogForRegion(HBASEDIR, TABLE_NAME, region);
1243       assertEquals(1, recovered.length);
1244       assertEquals(10, countHLog(recovered[0], fs, conf));
1245     }
1246   }
1247 
1248   @Test (timeout=300000)
1249   public void testSplitLogFileFirstLineCorruptionLog()
1250   throws IOException {
1251     conf.setBoolean(HBASE_SKIP_ERRORS, true);
1252     generateHLogs(1, 10, -1);
1253     FileStatus logfile = fs.listStatus(HLOGDIR)[0];
1254 
1255     corruptHLog(logfile.getPath(),
1256         Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
1257 
1258     fs.initialize(fs.getUri(), conf);
1259     HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
1260 
1261     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
1262         "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
1263     assertEquals(1, fs.listStatus(corruptDir).length);
1264   }
1265 
1266   /**
1267    * @throws IOException
1268    * @see https://issues.apache.org/jira/browse/HBASE-4862
1269    */
1270   @Test (timeout=300000)
1271   public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1272     LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1273     // Generate hlogs for our destination region
1274     String regionName = "r0";
1275     final Path regiondir = new Path(TABLEDIR, regionName);
1276     REGIONS = new ArrayList<String>();
1277     REGIONS.add(regionName);
1278     generateHLogs(-1);
1279 
1280     HLogFactory.createHLog(fs, regiondir, regionName, conf);
1281     FileStatus[] logfiles = fs.listStatus(HLOGDIR);
1282     assertTrue("There should be some log file",
1283       logfiles != null && logfiles.length > 0);
1284 
1285     HLogSplitter logSplitter = new HLogSplitter(
1286         conf, HBASEDIR, fs, null, null, this.mode) {
1287       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
1288       throws IOException {
1289         HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);
1290         // After creating writer, simulate region's
1291         // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1292         // region and delete them, excluding files with '.temp' suffix.
1293         NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
1294         if (files != null && !files.isEmpty()) {
1295           for (Path file : files) {
1296             if (!this.fs.delete(file, false)) {
1297               LOG.error("Failed delete of " + file);
1298             } else {
1299               LOG.debug("Deleted recovered.edits file=" + file);
1300             }
1301           }
1302         }
1303         return writer;
1304       }
1305     };
1306     try{
1307       logSplitter.splitLogFile(logfiles[0], null);
1308     } catch (IOException e) {
1309       LOG.info(e);
1310       Assert.fail("Throws IOException when spliting "
1311           + "log, it is most likely because writing file does not "
1312           + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1313     }
1314     if (fs.exists(CORRUPTDIR)) {
1315       if (fs.listStatus(CORRUPTDIR).length > 0) {
1316         Assert.fail("There are some corrupt logs, "
1317                 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1318       }
1319     }
1320   }
1321 
1322   private static void flushToConsole(String s) {
1323     System.out.println(s);
1324     System.out.flush();
1325   }
1326 
1327 
1328   private HLog.Writer [] generateHLogs(int leaveOpen) throws IOException {
1329     return generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
1330   }
1331 
1332   private HLog.Writer [] generateHLogs(final int writers, final int entries, final int leaveOpen) throws IOException {
1333     return generateHLogs((DistributedFileSystem)this.fs, writers, entries, leaveOpen);
1334   }
1335 
1336   private static void makeRegionDirs(FileSystem fs, List<String> regions) throws IOException {
1337     for (String region : regions) {
1338       flushToConsole("Creating dir for region " + region);
1339       fs.mkdirs(new Path(TABLEDIR, region));
1340     }
1341   }
1342 
1343   private static HLog.Writer [] generateHLogs(final DistributedFileSystem dfs, int writers, int entries, int leaveOpen)
1344   throws IOException {
1345     makeRegionDirs(dfs, REGIONS);
1346     dfs.mkdirs(HLOGDIR);
1347     HLog.Writer [] ws = new HLog.Writer[writers];
1348     int seq = 0;
1349     for (int i = 0; i < writers; i++) {
1350       ws[i] = HLogFactory.createWALWriter(dfs, new Path(HLOGDIR, HLOG_FILE_PREFIX + i), dfs.getConf());
1351       for (int j = 0; j < entries; j++) {
1352         int prefix = 0;
1353         for (String region : REGIONS) {
1354           String row_key = region + prefix++ + i + j;
1355           appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq++);
1356         }
1357       }
1358       if (i != leaveOpen) {
1359         ws[i].close();
1360         LOG.info("Closing writer " + i);
1361       }
1362     }
1363     return ws;
1364   }
1365 
1366   private Path[] getLogForRegion(Path rootdir, TableName table, String region)
1367   throws IOException {
1368     Path tdir = FSUtils.getTableDir(rootdir, table);
1369     @SuppressWarnings("deprecation")
1370     Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1371       Bytes.toString(region.getBytes())));
1372     FileStatus [] files = this.fs.listStatus(editsdir);
1373     Path[] paths = new Path[files.length];
1374     for (int i = 0; i < files.length; i++) {
1375       paths[i] = files[i].getPath();
1376     }
1377     return paths;
1378   }
1379 
1380   private void corruptHLog(Path path, Corruptions corruption, boolean close,
1381                            FileSystem fs) throws IOException {
1382 
1383     FSDataOutputStream out;
1384     int fileSize = (int) fs.listStatus(path)[0].getLen();
1385 
1386     FSDataInputStream in = fs.open(path);
1387     byte[] corrupted_bytes = new byte[fileSize];
1388     in.readFully(0, corrupted_bytes, 0, fileSize);
1389     in.close();
1390 
1391     switch (corruption) {
1392       case APPEND_GARBAGE:
1393         fs.delete(path, false);
1394         out = fs.create(path);
1395         out.write(corrupted_bytes);
1396         out.write("-----".getBytes());
1397         closeOrFlush(close, out);
1398         break;
1399 
1400       case INSERT_GARBAGE_ON_FIRST_LINE:
1401         fs.delete(path, false);
1402         out = fs.create(path);
1403         out.write(0);
1404         out.write(corrupted_bytes);
1405         closeOrFlush(close, out);
1406         break;
1407 
1408       case INSERT_GARBAGE_IN_THE_MIDDLE:
1409         fs.delete(path, false);
1410         out = fs.create(path);
1411         int middle = (int) Math.floor(corrupted_bytes.length / 2);
1412         out.write(corrupted_bytes, 0, middle);
1413         out.write(0);
1414         out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1415         closeOrFlush(close, out);
1416         break;
1417 
1418       case TRUNCATE:
1419         fs.delete(path, false);
1420         out = fs.create(path);
1421         out.write(corrupted_bytes, 0, fileSize
1422           - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1423         closeOrFlush(close, out);
1424         break;
1425 
1426       case TRUNCATE_TRAILER:
1427         fs.delete(path, false);
1428         out = fs.create(path);
1429         out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1430         closeOrFlush(close, out);
1431         break;
1432     }
1433   }
1434 
1435   private void closeOrFlush(boolean close, FSDataOutputStream out)
1436   throws IOException {
1437     if (close) {
1438       out.close();
1439     } else {
1440       Method syncMethod = null;
1441       try {
1442         syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1443       } catch (NoSuchMethodException e) {
1444         try {
1445           syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1446         } catch (NoSuchMethodException ex) {
1447           throw new IOException("This version of Hadoop supports " +
1448               "neither Syncable.sync() nor Syncable.hflush().");
1449         }
1450       }
1451       try {
1452         syncMethod.invoke(out, new Object[]{});
1453       } catch (Exception e) {
1454         throw new IOException(e);
1455       }
1456       // Not in 0out.hflush();
1457     }
1458   }
1459 
1460   @SuppressWarnings("unused")
1461   private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1462     HLog.Entry entry;
1463     HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1464     while ((entry = in.next()) != null) {
1465       System.out.println(entry);
1466     }
1467   }
1468 
1469   private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1470     int count = 0;
1471     HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1472     while (in.next() != null) {
1473       count++;
1474     }
1475     return count;
1476   }
1477 
1478 
1479   public static long appendEntry(HLog.Writer writer, TableName table, byte[] region,
1480                           byte[] row, byte[] family, byte[] qualifier,
1481                           byte[] value, long seq)
1482           throws IOException {
1483     LOG.info(Thread.currentThread().getName() + " append");
1484     writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1485     LOG.info(Thread.currentThread().getName() + " sync");
1486     writer.sync();
1487     return seq;
1488   }
1489 
1490   private static HLog.Entry createTestEntry(
1491       TableName table, byte[] region,
1492       byte[] row, byte[] family, byte[] qualifier,
1493       byte[] value, long seq) {
1494     long time = System.nanoTime();
1495     WALEdit edit = new WALEdit();
1496     seq++;
1497     edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
1498     return new HLog.Entry(new HLogKey(region, table, seq, time,
1499         HConstants.DEFAULT_CLUSTER_ID), edit);
1500   }
1501 
1502 
1503   private void injectEmptyFile(String suffix, boolean closeFile)
1504           throws IOException {
1505     HLog.Writer writer = HLogFactory.createWALWriter(
1506         fs, new Path(HLOGDIR, HLOG_FILE_PREFIX + suffix), conf);
1507     if (closeFile) writer.close();
1508   }
1509 
1510   @SuppressWarnings("unused")
1511   private void listLogs(FileSystem fs, Path dir) throws IOException {
1512     for (FileStatus file : fs.listStatus(dir)) {
1513       System.out.println(file.getPath());
1514     }
1515 
1516   }
1517 
1518   private int compareHLogSplitDirs(Path p1, Path p2) throws IOException {
1519     FileStatus[] f1 = fs.listStatus(p1);
1520     FileStatus[] f2 = fs.listStatus(p2);
1521     assertNotNull("Path " + p1 + " doesn't exist", f1);
1522     assertNotNull("Path " + p2 + " doesn't exist", f2);
1523 
1524     System.out.println("Files in " + p1 + ": " +
1525         Joiner.on(",").join(FileUtil.stat2Paths(f1)));
1526     System.out.println("Files in " + p2 + ": " +
1527         Joiner.on(",").join(FileUtil.stat2Paths(f2)));
1528     assertEquals(f1.length, f2.length);
1529 
1530     for (int i = 0; i < f1.length; i++) {
1531       // Regions now have a directory named RECOVERED_EDITS_DIR and in here
1532       // are split edit files. In below presume only 1.
1533       Path rd1 = HLogUtil.getRegionDirRecoveredEditsDir(f1[i].getPath());
1534       FileStatus[] rd1fs = fs.listStatus(rd1);
1535       assertEquals(1, rd1fs.length);
1536       Path rd2 = HLogUtil.getRegionDirRecoveredEditsDir(f2[i].getPath());
1537       FileStatus[] rd2fs = fs.listStatus(rd2);
1538       assertEquals(1, rd2fs.length);
1539       if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
1540         return -1;
1541       }
1542     }
1543     return 0;
1544   }
1545 
1546   private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1547     HLog.Reader in1, in2;
1548     in1 = HLogFactory.createReader(fs, p1, conf);
1549     in2 = HLogFactory.createReader(fs, p2, conf);
1550     HLog.Entry entry1;
1551     HLog.Entry entry2;
1552     while ((entry1 = in1.next()) != null) {
1553       entry2 = in2.next();
1554       if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1555               (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1556         return false;
1557       }
1558     }
1559     return true;
1560   }
1561 }