View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
22  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
23  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
24  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
25  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
26  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
27  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
28  import static org.junit.Assert.*;
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.List;
36  import java.util.NavigableSet;
37  import java.util.Set;
38  import java.util.TreeSet;
39  import java.util.concurrent.ExecutorService;
40  import java.util.concurrent.Executors;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.TimeoutException;
44  import java.util.concurrent.atomic.AtomicLong;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FSDataOutputStream;
50  import org.apache.hadoop.fs.FileStatus;
51  import org.apache.hadoop.fs.FileSystem;
52  import org.apache.hadoop.fs.Path;
53  import org.apache.hadoop.hbase.HColumnDescriptor;
54  import org.apache.hadoop.hbase.TableName;
55  import org.apache.hadoop.hbase.HBaseConfiguration;
56  import org.apache.hadoop.hbase.HBaseTestingUtility;
57  import org.apache.hadoop.hbase.HConstants;
58  import org.apache.hadoop.hbase.HRegionInfo;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.testclassification.LargeTests;
62  import org.apache.hadoop.hbase.MiniHBaseCluster;
63  import org.apache.hadoop.hbase.NamespaceDescriptor;
64  import org.apache.hadoop.hbase.ServerName;
65  import org.apache.hadoop.hbase.SplitLogCounters;
66  import org.apache.hadoop.hbase.Waiter;
67  import org.apache.hadoop.hbase.client.Delete;
68  import org.apache.hadoop.hbase.client.Get;
69  import org.apache.hadoop.hbase.client.HConnectionManager;
70  import org.apache.hadoop.hbase.client.HTable;
71  import org.apache.hadoop.hbase.client.Increment;
72  import org.apache.hadoop.hbase.client.NonceGenerator;
73  import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
74  import org.apache.hadoop.hbase.client.Put;
75  import org.apache.hadoop.hbase.client.Result;
76  import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
77  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
78  import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
79  import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
80  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
81  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
82  import org.apache.hadoop.hbase.regionserver.HRegion;
83  import org.apache.hadoop.hbase.regionserver.HRegionServer;
84  import org.apache.hadoop.hbase.regionserver.wal.HLog;
85  import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
86  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
87  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
88  import org.apache.hadoop.hbase.util.Bytes;
89  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
90  import org.apache.hadoop.hbase.util.FSUtils;
91  import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
92  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
93  import org.apache.hadoop.hbase.util.Threads;
94  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
95  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
96  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
97  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
98  import org.apache.hadoop.hdfs.MiniDFSCluster;
99  import org.apache.zookeeper.KeeperException;
100 import org.junit.After;
101 import org.junit.AfterClass;
102 import org.junit.Assert;
103 import org.junit.Before;
104 import org.junit.BeforeClass;
105 import org.junit.Test;
106 import org.junit.experimental.categories.Category;
107 
108 @Category(LargeTests.class)
109 public class TestDistributedLogSplitting {
110   private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
111   static {
112     // Uncomment the following line if more verbosity is needed for
113     // debugging (see HBASE-12285 for details).
114     //Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
115 
116     // test ThreeRSAbort fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. this
117     // turns it off for this test.  TODO: Figure out why scr breaks recovery.
118     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
119 
120   }
121 
122   // Start a cluster with 2 masters and 6 regionservers
123   static final int NUM_MASTERS = 2;
124   static final int NUM_RS = 6;
125 
126   MiniHBaseCluster cluster;
127   HMaster master;
128   Configuration conf;
129   static Configuration originalConf;
130   static HBaseTestingUtility TEST_UTIL;
131   static MiniDFSCluster dfsCluster;
132   static MiniZooKeeperCluster zkCluster;
133 
134   @BeforeClass
135   public static void setup() throws Exception {
136     TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create());
137     dfsCluster = TEST_UTIL.startMiniDFSCluster(1);
138     zkCluster = TEST_UTIL.startMiniZKCluster();
139     originalConf = TEST_UTIL.getConfiguration();
140   }
141 
142   @AfterClass
143   public static void tearDown() throws IOException {
144     TEST_UTIL.shutdownMiniZKCluster();
145     TEST_UTIL.shutdownMiniDFSCluster();
146   }
147 
148   private void startCluster(int num_rs) throws Exception {
149     SplitLogCounters.resetCounters();
150     LOG.info("Starting cluster");
151     conf.getLong("hbase.splitlog.max.resubmit", 0);
152     // Make the failure test faster
153     conf.setInt("zookeeper.recovery.retry", 0);
154     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
155     conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
156     conf.setInt("hbase.regionserver.wal.max.splitters", 3);
157     conf.setInt("hfile.format.version", 3);
158     conf.setInt("hbase.master.maximum.ping.server.attempts", 3);
159     TEST_UTIL.shutdownMiniHBaseCluster();
160     TEST_UTIL = new HBaseTestingUtility(conf);
161     TEST_UTIL.setDFSCluster(dfsCluster);
162     TEST_UTIL.setZkCluster(zkCluster);
163     TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs);
164     cluster = TEST_UTIL.getHBaseCluster();
165     LOG.info("Waiting for active/ready master");
166     cluster.waitForActiveAndReadyMaster();
167     master = cluster.getMaster();
168     while (cluster.getLiveRegionServerThreads().size() < num_rs) {
169       Threads.sleep(1);
170     }
171   }
172 
173   @Before
174   public void before() throws Exception {
175     // refresh configuration
176     conf = HBaseConfiguration.create(originalConf);
177   }
178 
179   @After
180   public void after() throws Exception {
181     try {
182       if (TEST_UTIL.getHBaseCluster() != null) {
183         for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) {
184           mt.getMaster().abort("closing...", null);
185         }
186       }
187       TEST_UTIL.shutdownMiniHBaseCluster();
188     } finally {
189       TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
190       ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
191     }
192   }
193 
194   @Test (timeout=300000)
195   public void testRecoveredEdits() throws Exception {
196     LOG.info("testRecoveredEdits");
197     conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
198     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
199     startCluster(NUM_RS);
200 
201     final int NUM_LOG_LINES = 1000;
202     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
203     // turn off load balancing to prevent regions from moving around otherwise
204     // they will consume recovered.edits
205     master.balanceSwitch(false);
206     FileSystem fs = master.getMasterFileSystem().getFileSystem();
207 
208     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
209 
210     Path rootdir = FSUtils.getRootDir(conf);
211 
212     installTable(new ZooKeeperWatcher(conf, "table-creation", null),
213         "table", "family", 40);
214     TableName table = TableName.valueOf("table");
215     List<HRegionInfo> regions = null;
216     HRegionServer hrs = null;
217     for (int i = 0; i < NUM_RS; i++) {
218       boolean foundRs = false;
219       hrs = rsts.get(i).getRegionServer();
220       regions = ProtobufUtil.getOnlineRegions(hrs);
221       for (HRegionInfo region : regions) {
222         if (region.getTable().getNameAsString().equalsIgnoreCase("table")) {
223           foundRs = true;
224           break;
225         }
226       }
227       if (foundRs) break;
228     }
229     final Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(hrs
230         .getServerName().toString()));
231 
232     LOG.info("#regions = " + regions.size());
233     Iterator<HRegionInfo> it = regions.iterator();
234     while (it.hasNext()) {
235       HRegionInfo region = it.next();
236       if (region.getTable().getNamespaceAsString()
237           .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
238         it.remove();
239       }
240     }
241     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
242 
243     slm.splitLogDistributed(logDir);
244 
245     int count = 0;
246     for (HRegionInfo hri : regions) {
247 
248       Path tdir = FSUtils.getTableDir(rootdir, table);
249       @SuppressWarnings("deprecation")
250       Path editsdir =
251         HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
252       LOG.debug("checking edits dir " + editsdir);
253       FileStatus[] files = fs.listStatus(editsdir);
254       assertTrue(files.length > 1);
255       for (int i = 0; i < files.length; i++) {
256         int c = countHLog(files[i].getPath(), fs, conf);
257         count += c;
258       }
259       LOG.info(count + " edits in " + files.length + " recovered edits files.");
260     }
261 
262     // check that the log file is moved
263     assertFalse(fs.exists(logDir));
264 
265     assertEquals(NUM_LOG_LINES, count);
266   }
267 
268   @Test(timeout = 300000)
269   public void testLogReplayWithNonMetaRSDown() throws Exception {
270     LOG.info("testLogReplayWithNonMetaRSDown");
271     conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
272     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
273     startCluster(NUM_RS);
274     final int NUM_REGIONS_TO_CREATE = 40;
275     final int NUM_LOG_LINES = 1000;
276     // turn off load balancing to prevent regions from moving around otherwise
277     // they will consume recovered.edits
278     master.balanceSwitch(false);
279 
280     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
281     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
282 
283     HRegionServer hrs = findRSToKill(false, "table");
284     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
285     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
286 
287     // wait for abort completes
288     this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
289     ht.close();
290     zkw.close();
291   }
292 
293   private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator {
294     private boolean isDups = false;
295     private LinkedList<Long> nonces = new LinkedList<Long>();
296 
297     public void startDups() {
298       isDups = true;
299     }
300 
301     @Override
302     public long newNonce() {
303       long nonce = isDups ? nonces.removeFirst() : super.newNonce();
304       if (!isDups) {
305         nonces.add(nonce);
306       }
307       return nonce;
308     }
309   }
310 
311   @Test(timeout = 300000)
312   public void testNonceRecovery() throws Exception {
313     LOG.info("testNonceRecovery");
314     final String TABLE_NAME = "table";
315     final String FAMILY_NAME = "family";
316     final int NUM_REGIONS_TO_CREATE = 40;
317 
318     conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
319     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
320     startCluster(NUM_RS);
321     master.balanceSwitch(false);
322 
323     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
324     HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
325     NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
326     NonceGenerator oldNg =
327         HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), ng);
328 
329     try {
330       List<Increment> reqs = new ArrayList<Increment>();
331       for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
332         HRegionServer hrs = rst.getRegionServer();
333         List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs);
334         for (HRegionInfo hri : hris) {
335           if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) {
336             byte[] key = hri.getStartKey();
337             if (key == null || key.length == 0) {
338               key = Bytes.copy(hri.getEndKey());
339               --(key[key.length - 1]);
340             }
341             Increment incr = new Increment(key);
342             incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1);
343             ht.increment(incr);
344             reqs.add(incr);
345           }
346         }
347       }
348 
349       HRegionServer hrs = findRSToKill(false, "table");
350       abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
351       ng.startDups();
352       for (Increment incr : reqs) {
353         try {
354           ht.increment(incr);
355           fail("should have thrown");
356         } catch (OperationConflictException ope) {
357           LOG.debug("Caught as expected: " + ope.getMessage());
358         }
359       }
360     } finally {
361       HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), oldNg);
362       ht.close();
363       zkw.close();
364     }
365   }
366 
367   @Test(timeout = 300000)
368   public void testLogReplayWithMetaRSDown() throws Exception {
369     LOG.info("testRecoveredEditsReplayWithMetaRSDown");
370     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
371     startCluster(NUM_RS);
372     final int NUM_REGIONS_TO_CREATE = 40;
373     final int NUM_LOG_LINES = 1000;
374     // turn off load balancing to prevent regions from moving around otherwise
375     // they will consume recovered.edits
376     master.balanceSwitch(false);
377 
378     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
379     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
380 
381     HRegionServer hrs = findRSToKill(true, "table");
382     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
383     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
384 
385     this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
386     ht.close();
387     zkw.close();
388   }
389 
390   private void abortRSAndVerifyRecovery(HRegionServer hrs, HTable ht, final ZooKeeperWatcher zkw,
391       final int numRegions, final int numofLines) throws Exception {
392 
393     abortRSAndWaitForRecovery(hrs, zkw, numRegions);
394     assertEquals(numofLines, TEST_UTIL.countRows(ht));
395   }
396 
397   private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw,
398       final int numRegions) throws Exception {
399     final MiniHBaseCluster tmpCluster = this.cluster;
400 
401     // abort RS
402     LOG.info("Aborting region server: " + hrs.getServerName());
403     hrs.abort("testing");
404 
405     // wait for abort completes
406     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
407       @Override
408       public boolean evaluate() throws Exception {
409         return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
410       }
411     });
412 
413     // wait for regions come online
414     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
415       @Override
416       public boolean evaluate() throws Exception {
417         return (getAllOnlineRegions(tmpCluster).size() >= (numRegions + 1));
418       }
419     });
420 
421     // wait for all regions are fully recovered
422     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
423       @Override
424       public boolean evaluate() throws Exception {
425         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
426           zkw.recoveringRegionsZNode, false);
427         return (recoveringRegions != null && recoveringRegions.size() == 0);
428       }
429     });
430   }
431 
432   @Test(timeout = 300000)
433   public void testMasterStartsUpWithLogSplittingWork() throws Exception {
434     LOG.info("testMasterStartsUpWithLogSplittingWork");
435     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
436     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
437     startCluster(NUM_RS);
438 
439     final int NUM_REGIONS_TO_CREATE = 40;
440     final int NUM_LOG_LINES = 1000;
441     // turn off load balancing to prevent regions from moving around otherwise
442     // they will consume recovered.edits
443     master.balanceSwitch(false);
444 
445     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
446     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
447     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
448 
449     HRegionServer hrs = findRSToKill(false, "table");
450     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
451     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
452 
453     // abort master
454     abortMaster(cluster);
455 
456     // abort RS
457     LOG.info("Aborting region server: " + hrs.getServerName());
458     hrs.abort("testing");
459 
460     // wait for abort completes
461     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
462       @Override
463       public boolean evaluate() throws Exception {
464         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
465       }
466     });
467 
468     Thread.sleep(2000);
469     LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
470 
471     startMasterAndWaitUntilLogSplit(cluster);
472 
473     // wait for abort completes
474     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
475       @Override
476       public boolean evaluate() throws Exception {
477         return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
478       }
479     });
480 
481     LOG.info("Current Open Regions After Master Node Starts Up:"
482         + getAllOnlineRegions(cluster).size());
483 
484     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
485 
486     ht.close();
487     zkw.close();
488   }
489 
490   @Test(timeout = 300000)
491   public void testMasterStartsUpWithLogReplayWork() throws Exception {
492     LOG.info("testMasterStartsUpWithLogReplayWork");
493     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
494     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
495     startCluster(NUM_RS);
496 
497     final int NUM_REGIONS_TO_CREATE = 40;
498     final int NUM_LOG_LINES = 1000;
499     // turn off load balancing to prevent regions from moving around otherwise
500     // they will consume recovered.edits
501     master.balanceSwitch(false);
502 
503     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
504     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
505     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
506 
507     HRegionServer hrs = findRSToKill(false, "table");
508     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
509     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
510 
511     // abort master
512     abortMaster(cluster);
513 
514     // abort RS
515     LOG.info("Aborting region server: " + hrs.getServerName());
516     hrs.abort("testing");
517 
518     // wait for the RS dies
519     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
520       @Override
521       public boolean evaluate() throws Exception {
522         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
523       }
524     });
525 
526     Thread.sleep(2000);
527     LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
528 
529     startMasterAndWaitUntilLogSplit(cluster);
530 
531     // wait for all regions are fully recovered
532     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
533       @Override
534       public boolean evaluate() throws Exception {
535         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
536           zkw.recoveringRegionsZNode, false);
537         return (recoveringRegions != null && recoveringRegions.size() == 0);
538       }
539     });
540 
541     LOG.info("Current Open Regions After Master Node Starts Up:"
542         + getAllOnlineRegions(cluster).size());
543 
544     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
545 
546     ht.close();
547     zkw.close();
548   }
549 
550 
551   @Test(timeout = 300000)
552   public void testLogReplayTwoSequentialRSDown() throws Exception {
553     LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
554     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
555     startCluster(NUM_RS);
556     final int NUM_REGIONS_TO_CREATE = 40;
557     final int NUM_LOG_LINES = 1000;
558     // turn off load balancing to prevent regions from moving around otherwise
559     // they will consume recovered.edits
560     master.balanceSwitch(false);
561 
562     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
563     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
564     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
565 
566     List<HRegionInfo> regions = null;
567     HRegionServer hrs1 = findRSToKill(false, "table");
568     regions = ProtobufUtil.getOnlineRegions(hrs1);
569 
570     makeHLog(hrs1.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
571 
572     // abort RS1
573     LOG.info("Aborting region server: " + hrs1.getServerName());
574     hrs1.abort("testing");
575 
576     // wait for abort completes
577     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
578       @Override
579       public boolean evaluate() throws Exception {
580         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
581       }
582     });
583 
584     // wait for regions come online
585     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
586       @Override
587       public boolean evaluate() throws Exception {
588         return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
589       }
590     });
591 
592     // sleep a little bit in order to interrupt recovering in the middle
593     Thread.sleep(300);
594     // abort second region server
595     rsts = cluster.getLiveRegionServerThreads();
596     HRegionServer hrs2 = rsts.get(0).getRegionServer();
597     LOG.info("Aborting one more region server: " + hrs2.getServerName());
598     hrs2.abort("testing");
599 
600     // wait for abort completes
601     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
602       @Override
603       public boolean evaluate() throws Exception {
604         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2));
605       }
606     });
607 
608     // wait for regions come online
609     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
610       @Override
611       public boolean evaluate() throws Exception {
612         return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
613       }
614     });
615 
616     // wait for all regions are fully recovered
617     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
618       @Override
619       public boolean evaluate() throws Exception {
620         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
621           zkw.recoveringRegionsZNode, false);
622         return (recoveringRegions != null && recoveringRegions.size() == 0);
623       }
624     });
625 
626     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
627     ht.close();
628     zkw.close();
629   }
630 
631   @Test(timeout = 300000)
632   public void testMarkRegionsRecoveringInZK() throws Exception {
633     LOG.info("testMarkRegionsRecoveringInZK");
634     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
635     startCluster(NUM_RS);
636     master.balanceSwitch(false);
637     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
638     final ZooKeeperWatcher zkw = master.getZooKeeperWatcher();
639     HTable ht = installTable(zkw, "table", "family", 40);
640     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
641 
642     Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
643     HRegionInfo region = null;
644     HRegionServer hrs = null;
645     ServerName firstFailedServer = null;
646     ServerName secondFailedServer = null;
647     for (int i = 0; i < NUM_RS; i++) {
648       hrs = rsts.get(i).getRegionServer();
649       List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
650       if (regions.isEmpty()) continue;
651       region = regions.get(0);
652       regionSet.add(region);
653       firstFailedServer = hrs.getServerName();
654       secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName();
655       break;
656     }
657 
658     slm.markRegionsRecoveringInZK(firstFailedServer, regionSet);
659     slm.markRegionsRecoveringInZK(secondFailedServer, regionSet);
660 
661     List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
662       ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
663 
664     assertEquals(recoveringRegions.size(), 2);
665 
666     // wait for splitLogWorker to mark them up because there is no WAL files recorded in ZK
667     final HRegionServer tmphrs = hrs;
668     TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
669       @Override
670       public boolean evaluate() throws Exception {
671         return (tmphrs.getRecoveringRegions().size() == 0);
672       }
673     });
674     ht.close();
675     zkw.close();
676   }
677 
678   @Test(timeout = 300000)
679   public void testReplayCmd() throws Exception {
680     LOG.info("testReplayCmd");
681     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
682     startCluster(NUM_RS);
683     final int NUM_REGIONS_TO_CREATE = 40;
684     // turn off load balancing to prevent regions from moving around otherwise
685     // they will consume recovered.edits
686     master.balanceSwitch(false);
687 
688     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
689     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
690     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
691 
692     List<HRegionInfo> regions = null;
693     HRegionServer hrs = null;
694     for (int i = 0; i < NUM_RS; i++) {
695       boolean isCarryingMeta = false;
696       hrs = rsts.get(i).getRegionServer();
697       regions = ProtobufUtil.getOnlineRegions(hrs);
698       for (HRegionInfo region : regions) {
699         if (region.isMetaRegion()) {
700           isCarryingMeta = true;
701           break;
702         }
703       }
704       if (isCarryingMeta) {
705         continue;
706       }
707       if (regions.size() > 0) break;
708     }
709 
710     this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
711     String originalCheckSum = TEST_UTIL.checksumRows(ht);
712 
713     // abort RA and trigger replay
714     abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
715 
716     assertEquals("Data should remain after reopening of regions", originalCheckSum,
717       TEST_UTIL.checksumRows(ht));
718 
719     ht.close();
720     zkw.close();
721   }
722 
723   @Test(timeout = 300000)
724   public void testLogReplayForDisablingTable() throws Exception {
725     LOG.info("testLogReplayForDisablingTable");
726     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
727     startCluster(NUM_RS);
728     final int NUM_REGIONS_TO_CREATE = 40;
729     final int NUM_LOG_LINES = 1000;
730 
731     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
732     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
733     HTable disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE);
734     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE);
735 
736     // turn off load balancing to prevent regions from moving around otherwise
737     // they will consume recovered.edits
738     master.balanceSwitch(false);
739 
740     List<HRegionInfo> regions = null;
741     HRegionServer hrs = null;
742     boolean hasRegionsForBothTables = false;
743     String tableName = null;
744     for (int i = 0; i < NUM_RS; i++) {
745       tableName = null;
746       hasRegionsForBothTables = false;
747       boolean isCarryingSystem = false;
748       hrs = rsts.get(i).getRegionServer();
749       regions = ProtobufUtil.getOnlineRegions(hrs);
750       for (HRegionInfo region : regions) {
751         if (region.getTable().isSystemTable()) {
752           isCarryingSystem = true;
753           break;
754         }
755         if (tableName != null &&
756             !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) {
757           // make sure that we find a RS has online regions for both "table" and "disableTable"
758           hasRegionsForBothTables = true;
759           break;
760         } else if (tableName == null) {
761           tableName = region.getTable().getNameAsString();
762         }
763       }
764       if (isCarryingSystem) {
765         continue;
766       }
767       if (hasRegionsForBothTables) {
768         break;
769       }
770     }
771 
772     // make sure we found a good RS
773     Assert.assertTrue(hasRegionsForBothTables);
774 
775     LOG.info("#regions = " + regions.size());
776     Iterator<HRegionInfo> it = regions.iterator();
777     while (it.hasNext()) {
778       HRegionInfo region = it.next();
779       if (region.isMetaTable()) {
780         it.remove();
781       }
782     }
783     makeHLog(hrs.getWAL(), regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
784     makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
785 
786     LOG.info("Disabling table\n");
787     TEST_UTIL.getHBaseAdmin().disableTable(Bytes.toBytes("disableTable"));
788 
789     // abort RS
790     LOG.info("Aborting region server: " + hrs.getServerName());
791     hrs.abort("testing");
792 
793     // wait for abort completes
794     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
795       @Override
796       public boolean evaluate() throws Exception {
797         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
798       }
799     });
800 
801     // wait for regions come online
802     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
803       @Override
804       public boolean evaluate() throws Exception {
805         return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
806       }
807     });
808 
809     // wait for all regions are fully recovered
810     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
811       @Override
812       public boolean evaluate() throws Exception {
813         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
814           zkw.recoveringRegionsZNode, false);
815         ServerManager serverManager = master.getServerManager();
816         return (!serverManager.areDeadServersInProgress() &&
817             recoveringRegions != null && recoveringRegions.size() == 0);
818       }
819     });
820 
821     int count = 0;
822     FileSystem fs = master.getMasterFileSystem().getFileSystem();
823     Path rootdir = FSUtils.getRootDir(conf);
824     Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable"));
825     for (HRegionInfo hri : regions) {
826       @SuppressWarnings("deprecation")
827       Path editsdir =
828         HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
829       LOG.debug("checking edits dir " + editsdir);
830       if(!fs.exists(editsdir)) continue;
831       FileStatus[] files = fs.listStatus(editsdir);
832       if(files != null) {
833         for(FileStatus file : files) {
834           int c = countHLog(file.getPath(), fs, conf);
835           count += c;
836           LOG.info(c + " edits in " + file.getPath());
837         }
838       }
839     }
840 
841     LOG.info("Verify edits in recovered.edits files");
842     assertEquals(NUM_LOG_LINES, count);
843     LOG.info("Verify replayed edits");
844     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
845 
846     // clean up
847     for (HRegionInfo hri : regions) {
848       @SuppressWarnings("deprecation")
849       Path editsdir =
850         HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
851       fs.delete(editsdir, true);
852     }
853     disablingHT.close();
854     ht.close();
855     zkw.close();
856   }
857 
858   @Test(timeout = 300000)
859   public void testDisallowWritesInRecovering() throws Exception {
860     LOG.info("testDisallowWritesInRecovering");
861     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
862     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
863     conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
864     startCluster(NUM_RS);
865     final int NUM_REGIONS_TO_CREATE = 40;
866     // turn off load balancing to prevent regions from moving around otherwise
867     // they will consume recovered.edits
868     master.balanceSwitch(false);
869 
870     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
871     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
872     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
873     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
874 
875     Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
876     HRegionInfo region = null;
877     HRegionServer hrs = null;
878     HRegionServer dstRS = null;
879     for (int i = 0; i < NUM_RS; i++) {
880       hrs = rsts.get(i).getRegionServer();
881       List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
882       if (regions.isEmpty()) continue;
883       region = regions.get(0);
884       regionSet.add(region);
885       dstRS = rsts.get((i+1) % NUM_RS).getRegionServer();
886       break;
887     }
888 
889     slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet);
890     // move region in order for the region opened in recovering state
891     final HRegionInfo hri = region;
892     final HRegionServer tmpRS = dstRS;
893     TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(),
894       Bytes.toBytes(dstRS.getServerName().getServerName()));
895     // wait for region move completes
896     final RegionStates regionStates =
897         TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
898     TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
899       @Override
900       public boolean evaluate() throws Exception {
901         ServerName sn = regionStates.getRegionServerOfRegion(hri);
902         return (sn != null && sn.equals(tmpRS.getServerName()));
903       }
904     });
905 
906     try {
907       byte[] key = region.getStartKey();
908       if (key == null || key.length == 0) {
909         key = new byte[] { 0, 0, 0, 0, 1 };
910       }
911       ht.setAutoFlush(true, true);
912       Put put = new Put(key);
913       put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
914       ht.put(put);
915       ht.close();
916     } catch (IOException ioe) {
917       Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
918       RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
919       boolean foundRegionInRecoveryException = false;
920       for (Throwable t : re.getCauses()) {
921         if (t instanceof RegionInRecoveryException) {
922           foundRegionInRecoveryException = true;
923           break;
924         }
925       }
926       Assert.assertTrue(
927         "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(),
928         foundRegionInRecoveryException);
929     }
930 
931     zkw.close();
932   }
933 
934   /**
935    * The original intention of this test was to force an abort of a region
936    * server and to make sure that the failure path in the region servers is
937    * properly evaluated. But it is difficult to ensure that the region server
938    * doesn't finish the log splitting before it aborts. Also now, there is
939    * this code path where the master will preempt the region server when master
940    * detects that the region server has aborted.
941    * @throws Exception
942    */
943   @Test (timeout=300000)
944   public void testWorkerAbort() throws Exception {
945     LOG.info("testWorkerAbort");
946     startCluster(3);
947     final int NUM_LOG_LINES = 10000;
948     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
949     FileSystem fs = master.getMasterFileSystem().getFileSystem();
950 
951     final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
952     HRegionServer hrs = findRSToKill(false, "table");
953     Path rootdir = FSUtils.getRootDir(conf);
954     final Path logDir = new Path(rootdir,
955         HLogUtil.getHLogDirectoryName(hrs.getServerName().toString()));
956 
957     installTable(new ZooKeeperWatcher(conf, "table-creation", null),
958         "table", "family", 40);
959 
960     makeHLog(hrs.getWAL(), ProtobufUtil.getOnlineRegions(hrs), "table", "family", NUM_LOG_LINES,
961       100);
962 
963     new Thread() {
964       public void run() {
965         waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
966         for (RegionServerThread rst : rsts) {
967           rst.getRegionServer().abort("testing");
968           break;
969         }
970       }
971     }.start();
972     // slm.splitLogDistributed(logDir);
973     FileStatus[] logfiles = fs.listStatus(logDir);
974     TaskBatch batch = new TaskBatch();
975     slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
976     //waitForCounter but for one of the 2 counters
977     long curt = System.currentTimeMillis();
978     long waitTime = 80000;
979     long endt = curt + waitTime;
980     while (curt < endt) {
981       if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
982           tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
983           tot_wkr_preempt_task.get()) == 0) {
984         Thread.yield();
985         curt = System.currentTimeMillis();
986       } else {
987         assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
988             tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
989             tot_wkr_preempt_task.get()));
990         return;
991       }
992     }
993     fail("none of the following counters went up in " + waitTime +
994         " milliseconds - " +
995         "tot_wkr_task_resigned, tot_wkr_task_err, " +
996         "tot_wkr_final_transition_failed, tot_wkr_task_done, " +
997         "tot_wkr_preempt_task");
998   }
999 
1000   @Test (timeout=300000)
1001   public void testThreeRSAbort() throws Exception {
1002     LOG.info("testThreeRSAbort");
1003     final int NUM_REGIONS_TO_CREATE = 40;
1004     final int NUM_ROWS_PER_REGION = 100;
1005 
1006     startCluster(NUM_RS); // NUM_RS=6.
1007 
1008     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
1009         "distributed log splitting test", null);
1010 
1011     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1012     populateDataInTable(NUM_ROWS_PER_REGION, "family");
1013 
1014 
1015     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1016     assertEquals(NUM_RS, rsts.size());
1017     rsts.get(0).getRegionServer().abort("testing");
1018     rsts.get(1).getRegionServer().abort("testing");
1019     rsts.get(2).getRegionServer().abort("testing");
1020 
1021     long start = EnvironmentEdgeManager.currentTimeMillis();
1022     while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
1023       if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
1024         assertTrue(false);
1025       }
1026       Thread.sleep(200);
1027     }
1028 
1029     start = EnvironmentEdgeManager.currentTimeMillis();
1030     while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 1)) {
1031       if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
1032         assertTrue("Timedout", false);
1033       }
1034       Thread.sleep(200);
1035     }
1036 
1037     // wait for all regions are fully recovered
1038     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
1039       @Override
1040       public boolean evaluate() throws Exception {
1041         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
1042           zkw.recoveringRegionsZNode, false);
1043         return (recoveringRegions != null && recoveringRegions.size() == 0);
1044       }
1045     });
1046 
1047     assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
1048         TEST_UTIL.countRows(ht));
1049     ht.close();
1050     zkw.close();
1051   }
1052 
1053 
1054 
1055   @Test(timeout=30000)
1056   public void testDelayedDeleteOnFailure() throws Exception {
1057     LOG.info("testDelayedDeleteOnFailure");
1058     startCluster(1);
1059     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
1060     final FileSystem fs = master.getMasterFileSystem().getFileSystem();
1061     final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
1062     fs.mkdirs(logDir);
1063     ExecutorService executor = null;
1064     try {
1065       final Path corruptedLogFile = new Path(logDir, "x");
1066       FSDataOutputStream out;
1067       out = fs.create(corruptedLogFile);
1068       out.write(0);
1069       out.write(Bytes.toBytes("corrupted bytes"));
1070       out.close();
1071       slm.ignoreZKDeleteForTesting = true;
1072       executor = Executors.newSingleThreadExecutor();
1073       Runnable runnable = new Runnable() {
1074        @Override
1075        public void run() {
1076           try {
1077             // since the logDir is a fake, corrupted one, so the split log worker
1078             // will finish it quickly with error, and this call will fail and throw
1079             // an IOException.
1080             slm.splitLogDistributed(logDir);
1081           } catch (IOException ioe) {
1082             try {
1083               assertTrue(fs.exists(corruptedLogFile));
1084               // this call will block waiting for the task to be removed from the
1085               // tasks map which is not going to happen since ignoreZKDeleteForTesting
1086               // is set to true, until it is interrupted.
1087               slm.splitLogDistributed(logDir);
1088             } catch (IOException e) {
1089               assertTrue(Thread.currentThread().isInterrupted());
1090               return;
1091             }
1092             fail("did not get the expected IOException from the 2nd call");
1093           }
1094           fail("did not get the expected IOException from the 1st call");
1095         }
1096       };
1097       Future<?> result = executor.submit(runnable);
1098       try {
1099         result.get(2000, TimeUnit.MILLISECONDS);
1100       } catch (TimeoutException te) {
1101         // it is ok, expected.
1102       }
1103       waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
1104       executor.shutdownNow();
1105       executor = null;
1106 
1107       // make sure the runnable is finished with no exception thrown.
1108       result.get();
1109     } finally {
1110       if (executor != null) {
1111         // interrupt the thread in case the test fails in the middle.
1112         // it has no effect if the thread is already terminated.
1113         executor.shutdownNow();
1114       }
1115       fs.delete(logDir, true);
1116     }
1117   }
1118 
1119   @Test(timeout = 300000)
1120   public void testMetaRecoveryInZK() throws Exception {
1121     LOG.info("testMetaRecoveryInZK");
1122     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1123     startCluster(NUM_RS);
1124 
1125     // turn off load balancing to prevent regions from moving around otherwise
1126     // they will consume recovered.edits
1127     master.balanceSwitch(false);
1128     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1129     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1130 
1131     // only testing meta recovery in ZK operation
1132     HRegionServer hrs = findRSToKill(true, null);
1133     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
1134 
1135     LOG.info("#regions = " + regions.size());
1136     Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
1137     tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
1138     master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions);
1139     Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
1140     userRegionSet.addAll(regions);
1141     master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet);
1142     boolean isMetaRegionInRecovery = false;
1143     List<String> recoveringRegions =
1144         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1145     for (String curEncodedRegionName : recoveringRegions) {
1146       if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1147         isMetaRegionInRecovery = true;
1148         break;
1149       }
1150     }
1151     assertTrue(isMetaRegionInRecovery);
1152 
1153     master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
1154 
1155     isMetaRegionInRecovery = false;
1156     recoveringRegions =
1157         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1158     for (String curEncodedRegionName : recoveringRegions) {
1159       if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1160         isMetaRegionInRecovery = true;
1161         break;
1162       }
1163     }
1164     // meta region should be recovered
1165     assertFalse(isMetaRegionInRecovery);
1166     zkw.close();
1167   }
1168 
1169   @Test(timeout = 300000)
1170   public void testSameVersionUpdatesRecovery() throws Exception {
1171     LOG.info("testSameVersionUpdatesRecovery");
1172     conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1173     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1174     startCluster(NUM_RS);
1175     final AtomicLong sequenceId = new AtomicLong(100);
1176     final int NUM_REGIONS_TO_CREATE = 40;
1177     final int NUM_LOG_LINES = 1000;
1178     // turn off load balancing to prevent regions from moving around otherwise
1179     // they will consume recovered.edits
1180     master.balanceSwitch(false);
1181 
1182     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1183     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1184     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1185 
1186     List<HRegionInfo> regions = null;
1187     HRegionServer hrs = null;
1188     for (int i = 0; i < NUM_RS; i++) {
1189       boolean isCarryingMeta = false;
1190       hrs = rsts.get(i).getRegionServer();
1191       regions = ProtobufUtil.getOnlineRegions(hrs);
1192       for (HRegionInfo region : regions) {
1193         if (region.isMetaRegion()) {
1194           isCarryingMeta = true;
1195           break;
1196         }
1197       }
1198       if (isCarryingMeta) {
1199         continue;
1200       }
1201       break;
1202     }
1203 
1204     LOG.info("#regions = " + regions.size());
1205     Iterator<HRegionInfo> it = regions.iterator();
1206     while (it.hasNext()) {
1207       HRegionInfo region = it.next();
1208       if (region.isMetaTable()
1209           || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1210         it.remove();
1211       }
1212     }
1213     if (regions.size() == 0) return;
1214     HRegionInfo curRegionInfo = regions.get(0);
1215     byte[] startRow = curRegionInfo.getStartKey();
1216     if (startRow == null || startRow.length == 0) {
1217       startRow = new byte[] { 0, 0, 0, 0, 1 };
1218     }
1219     byte[] row = Bytes.incrementBytes(startRow, 1);
1220     // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
1221     row = Arrays.copyOfRange(row, 3, 8);
1222     long value = 0;
1223     byte[] tableName = Bytes.toBytes("table");
1224     byte[] family = Bytes.toBytes("family");
1225     byte[] qualifier = Bytes.toBytes("c1");
1226     long timeStamp = System.currentTimeMillis();
1227     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1228     htd.addFamily(new HColumnDescriptor(family));
1229     for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1230       WALEdit e = new WALEdit();
1231       value++;
1232       e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1233       hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e,
1234         System.currentTimeMillis(), htd, sequenceId);
1235     }
1236     hrs.getWAL().sync();
1237     hrs.getWAL().close();
1238 
1239     // wait for abort completes
1240     this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1241 
1242     // verify we got the last value
1243     LOG.info("Verification Starts...");
1244     Get g = new Get(row);
1245     Result r = ht.get(g);
1246     long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1247     assertEquals(value, theStoredVal);
1248 
1249     // after flush
1250     LOG.info("Verification after flush...");
1251     TEST_UTIL.getHBaseAdmin().flush(tableName);
1252     r = ht.get(g);
1253     theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1254     assertEquals(value, theStoredVal);
1255     ht.close();
1256   }
1257 
1258   @Test(timeout = 300000)
1259   public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception {
1260     LOG.info("testSameVersionUpdatesRecoveryWithWrites");
1261     conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1262     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1263     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
1264     conf.setInt("hbase.hstore.compactionThreshold", 3);
1265     startCluster(NUM_RS);
1266     final AtomicLong sequenceId = new AtomicLong(100);
1267     final int NUM_REGIONS_TO_CREATE = 40;
1268     final int NUM_LOG_LINES = 1000;
1269     // turn off load balancing to prevent regions from moving around otherwise
1270     // they will consume recovered.edits
1271     master.balanceSwitch(false);
1272 
1273     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1274     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1275     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1276 
1277     List<HRegionInfo> regions = null;
1278     HRegionServer hrs = null;
1279     for (int i = 0; i < NUM_RS; i++) {
1280       boolean isCarryingMeta = false;
1281       hrs = rsts.get(i).getRegionServer();
1282       regions = ProtobufUtil.getOnlineRegions(hrs);
1283       for (HRegionInfo region : regions) {
1284         if (region.isMetaRegion()) {
1285           isCarryingMeta = true;
1286           break;
1287         }
1288       }
1289       if (isCarryingMeta) {
1290         continue;
1291       }
1292       break;
1293     }
1294 
1295     LOG.info("#regions = " + regions.size());
1296     Iterator<HRegionInfo> it = regions.iterator();
1297     while (it.hasNext()) {
1298       HRegionInfo region = it.next();
1299       if (region.isMetaTable()
1300           || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1301         it.remove();
1302       }
1303     }
1304     if (regions.size() == 0) return;
1305     HRegionInfo curRegionInfo = regions.get(0);
1306     byte[] startRow = curRegionInfo.getStartKey();
1307     if (startRow == null || startRow.length == 0) {
1308       startRow = new byte[] { 0, 0, 0, 0, 1 };
1309     }
1310     byte[] row = Bytes.incrementBytes(startRow, 1);
1311     // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
1312     row = Arrays.copyOfRange(row, 3, 8);
1313     long value = 0;
1314     final byte[] tableName = Bytes.toBytes("table");
1315     byte[] family = Bytes.toBytes("family");
1316     byte[] qualifier = Bytes.toBytes("c1");
1317     long timeStamp = System.currentTimeMillis();
1318     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1319     htd.addFamily(new HColumnDescriptor(family));
1320     for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1321       WALEdit e = new WALEdit();
1322       value++;
1323       e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1324       hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e,
1325         System.currentTimeMillis(), htd, sequenceId);
1326     }
1327     hrs.getWAL().sync();
1328     hrs.getWAL().close();
1329 
1330     // wait for abort completes
1331     this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1332 
1333     // verify we got the last value
1334     LOG.info("Verification Starts...");
1335     Get g = new Get(row);
1336     Result r = ht.get(g);
1337     long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1338     assertEquals(value, theStoredVal);
1339 
1340     // after flush & compaction
1341     LOG.info("Verification after flush...");
1342     TEST_UTIL.getHBaseAdmin().flush(tableName);
1343     TEST_UTIL.getHBaseAdmin().compact(tableName);
1344 
1345     // wait for compaction completes
1346     TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
1347       @Override
1348       public boolean evaluate() throws Exception {
1349         return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE);
1350       }
1351     });
1352 
1353     r = ht.get(g);
1354     theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1355     assertEquals(value, theStoredVal);
1356     ht.close();
1357   }
1358 
1359   HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception {
1360     return installTable(zkw, tname, fname, nrs, 0);
1361   }
1362 
1363   HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs,
1364       int existingRegions) throws Exception {
1365     // Create a table with regions
1366     byte [] table = Bytes.toBytes(tname);
1367     byte [] family = Bytes.toBytes(fname);
1368     LOG.info("Creating table with " + nrs + " regions");
1369     HTable ht = TEST_UTIL.createTable(table, family);
1370     int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, nrs);
1371     assertEquals(nrs, numRegions);
1372       LOG.info("Waiting for no more RIT\n");
1373     blockUntilNoRIT(zkw, master);
1374     // disable-enable cycle to get rid of table's dead regions left behind
1375     // by createMultiRegions
1376     LOG.debug("Disabling table\n");
1377     TEST_UTIL.getHBaseAdmin().disableTable(table);
1378     LOG.debug("Waiting for no more RIT\n");
1379     blockUntilNoRIT(zkw, master);
1380     NavigableSet<String> regions = getAllOnlineRegions(cluster);
1381     LOG.debug("Verifying only catalog and namespace regions are assigned\n");
1382     if (regions.size() != 2) {
1383       for (String oregion : regions)
1384         LOG.debug("Region still online: " + oregion);
1385     }
1386     assertEquals(2 + existingRegions, regions.size());
1387     LOG.debug("Enabling table\n");
1388     TEST_UTIL.getHBaseAdmin().enableTable(table);
1389     LOG.debug("Waiting for no more RIT\n");
1390     blockUntilNoRIT(zkw, master);
1391     LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
1392     regions = getAllOnlineRegions(cluster);
1393     assertEquals(numRegions + 2 + existingRegions, regions.size());
1394     return ht;
1395   }
1396 
1397   void populateDataInTable(int nrows, String fname) throws Exception {
1398     byte [] family = Bytes.toBytes(fname);
1399 
1400     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1401     assertEquals(NUM_RS, rsts.size());
1402 
1403     for (RegionServerThread rst : rsts) {
1404       HRegionServer hrs = rst.getRegionServer();
1405       List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs);
1406       for (HRegionInfo hri : hris) {
1407         if (hri.getTable().isSystemTable()) {
1408           continue;
1409         }
1410         LOG.debug("adding data to rs = " + rst.getName() +
1411             " region = "+ hri.getRegionNameAsString());
1412         HRegion region = hrs.getOnlineRegion(hri.getRegionName());
1413         assertTrue(region != null);
1414         putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
1415       }
1416     }
1417   }
1418 
1419   public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
1420       int num_edits, int edit_size) throws IOException {
1421     makeHLog(log, regions, tname, fname, num_edits, edit_size, true);
1422   }
1423 
1424   public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
1425       int num_edits, int edit_size, boolean closeLog) throws IOException {
1426     TableName fullTName = TableName.valueOf(tname);
1427     // remove root and meta region
1428     regions.remove(HRegionInfo.FIRST_META_REGIONINFO);
1429     // using one sequenceId for edits across all regions is ok.
1430     final AtomicLong sequenceId = new AtomicLong(10);
1431 
1432 
1433     for(Iterator<HRegionInfo> iter = regions.iterator(); iter.hasNext(); ) {
1434       HRegionInfo regionInfo = iter.next();
1435       if(regionInfo.getTable().isSystemTable()) {
1436          iter.remove();
1437       }
1438     }
1439     HTableDescriptor htd = new HTableDescriptor(fullTName);
1440     byte[] family = Bytes.toBytes(fname);
1441     htd.addFamily(new HColumnDescriptor(family));
1442     byte[] value = new byte[edit_size];
1443 
1444     List<HRegionInfo> hris = new ArrayList<HRegionInfo>();
1445     for (HRegionInfo region : regions) {
1446       if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) {
1447         continue;
1448       }
1449       hris.add(region);
1450     }
1451     LOG.info("Creating wal edits across " + hris.size() + " regions.");
1452     for (int i = 0; i < edit_size; i++) {
1453       value[i] = (byte) ('a' + (i % 26));
1454     }
1455     int n = hris.size();
1456     int[] counts = new int[n];
1457     if (n > 0) {
1458       for (int i = 0; i < num_edits; i += 1) {
1459         WALEdit e = new WALEdit();
1460         HRegionInfo curRegionInfo = hris.get(i % n);
1461         byte[] startRow = curRegionInfo.getStartKey();
1462         if (startRow == null || startRow.length == 0) {
1463           startRow = new byte[] { 0, 0, 0, 0, 1 };
1464         }
1465         byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
1466         row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
1467                                              // HBaseTestingUtility.createMultiRegions use 5 bytes
1468                                              // key
1469         byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
1470         e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
1471         log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd, sequenceId);
1472         counts[i % n] += 1;
1473       }
1474     }
1475     log.sync();
1476     if(closeLog) {
1477       log.close();
1478     }
1479     for (int i = 0; i < n; i++) {
1480       LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
1481     }
1482     return;
1483   }
1484 
1485   private int countHLog(Path log, FileSystem fs, Configuration conf)
1486   throws IOException {
1487     int count = 0;
1488     HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1489     while (in.next() != null) {
1490       count++;
1491     }
1492     return count;
1493   }
1494 
1495   private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
1496   throws KeeperException, InterruptedException {
1497     ZKAssign.blockUntilNoRIT(zkw);
1498     master.assignmentManager.waitUntilNoRegionsInTransition(60000);
1499   }
1500 
1501   private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf,
1502       byte [] ...families)
1503   throws IOException {
1504     for(int i = 0; i < numRows; i++) {
1505       Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
1506       for(byte [] family : families) {
1507         put.add(family, qf, null);
1508       }
1509       region.put(put);
1510     }
1511   }
1512 
1513   /**
1514    * Load table with puts and deletes with expected values so that we can verify later
1515    */
1516   private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException {
1517     t.setAutoFlush(false, true);
1518     byte[] k = new byte[3];
1519 
1520     // add puts
1521     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1522       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1523         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1524           k[0] = b1;
1525           k[1] = b2;
1526           k[2] = b3;
1527           Put put = new Put(k);
1528           put.add(f, column, k);
1529           t.put(put);
1530         }
1531       }
1532     }
1533     t.flushCommits();
1534     // add deletes
1535     for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1536       k[0] = 'a';
1537       k[1] = 'a';
1538       k[2] = b3;
1539       Delete del = new Delete(k);
1540       t.delete(del);
1541     }
1542     t.flushCommits();
1543   }
1544 
1545   private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
1546       throws IOException {
1547     NavigableSet<String> online = new TreeSet<String>();
1548     for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
1549       for (HRegionInfo region : ProtobufUtil.getOnlineRegions(rst.getRegionServer())) {
1550         online.add(region.getRegionNameAsString());
1551       }
1552     }
1553     return online;
1554   }
1555 
1556   private void waitForCounter(AtomicLong ctr, long oldval, long newval,
1557       long timems) {
1558     long curt = System.currentTimeMillis();
1559     long endt = curt + timems;
1560     while (curt < endt) {
1561       if (ctr.get() == oldval) {
1562         Thread.yield();
1563         curt = System.currentTimeMillis();
1564       } else {
1565         assertEquals(newval, ctr.get());
1566         return;
1567       }
1568     }
1569     assertTrue(false);
1570   }
1571 
1572   private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
1573     for (MasterThread mt : cluster.getLiveMasterThreads()) {
1574       if (mt.getMaster().isActiveMaster()) {
1575         mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
1576         mt.join();
1577         break;
1578       }
1579     }
1580     LOG.debug("Master is aborted");
1581   }
1582 
1583   private void startMasterAndWaitUntilLogSplit(MiniHBaseCluster cluster)
1584       throws IOException, InterruptedException {
1585     cluster.startMaster();
1586     HMaster master = cluster.getMaster();
1587     while (!master.isInitialized()) {
1588       Thread.sleep(100);
1589     }
1590     ServerManager serverManager = master.getServerManager();
1591     while (serverManager.areDeadServersInProgress()) {
1592       Thread.sleep(100);
1593     }
1594   }
1595 
1596   /**
1597    * Find a RS that has regions of a table.
1598    * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
1599    * @param tableName
1600    * @return
1601    * @throws Exception
1602    */
1603   private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception {
1604     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1605     int numOfRSs = rsts.size();
1606     List<HRegionInfo> regions = null;
1607     HRegionServer hrs = null;
1608 
1609     for (int i = 0; i < numOfRSs; i++) {
1610       boolean isCarryingMeta = false;
1611       boolean foundTableRegion = false;
1612       hrs = rsts.get(i).getRegionServer();
1613       regions = ProtobufUtil.getOnlineRegions(hrs);
1614       for (HRegionInfo region : regions) {
1615         if (region.isMetaRegion()) {
1616           isCarryingMeta = true;
1617         }
1618         if (tableName == null || region.getTable().getNameAsString().equals(tableName)) {
1619           foundTableRegion = true;
1620         }
1621         if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
1622           break;
1623         }
1624       }
1625       if (isCarryingMeta && hasMetaRegion) {
1626         // clients ask for a RS with META
1627         if (!foundTableRegion) {
1628           final HRegionServer destRS = hrs;
1629           // the RS doesn't have regions of the specified table so we need move one to this RS
1630           List<HRegionInfo> tableRegions =
1631               TEST_UTIL.getHBaseAdmin().getTableRegions(Bytes.toBytes(tableName));
1632           final HRegionInfo hri = tableRegions.get(0);
1633           TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
1634             Bytes.toBytes(destRS.getServerName().getServerName()));
1635           // wait for region move completes
1636           final RegionStates regionStates =
1637               TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
1638           TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
1639             @Override
1640             public boolean evaluate() throws Exception {
1641               ServerName sn = regionStates.getRegionServerOfRegion(hri);
1642               return (sn != null && sn.equals(destRS.getServerName()));
1643             }
1644           });
1645         }
1646         return hrs;
1647       } else if (hasMetaRegion || isCarryingMeta) {
1648         continue;
1649       }
1650       if (foundTableRegion) break;
1651     }
1652 
1653     return hrs;
1654   }
1655 
1656 }