1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
22 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
23 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
24 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
25 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
26 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
27 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
28 import static org.junit.Assert.*;
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.NavigableSet;
37 import java.util.Set;
38 import java.util.TreeSet;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.TimeoutException;
44 import java.util.concurrent.atomic.AtomicLong;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FSDataOutputStream;
50 import org.apache.hadoop.fs.FileStatus;
51 import org.apache.hadoop.fs.FileSystem;
52 import org.apache.hadoop.fs.Path;
53 import org.apache.hadoop.hbase.HColumnDescriptor;
54 import org.apache.hadoop.hbase.TableName;
55 import org.apache.hadoop.hbase.HBaseConfiguration;
56 import org.apache.hadoop.hbase.HBaseTestingUtility;
57 import org.apache.hadoop.hbase.HConstants;
58 import org.apache.hadoop.hbase.HRegionInfo;
59 import org.apache.hadoop.hbase.HTableDescriptor;
60 import org.apache.hadoop.hbase.KeyValue;
61 import org.apache.hadoop.hbase.testclassification.LargeTests;
62 import org.apache.hadoop.hbase.MiniHBaseCluster;
63 import org.apache.hadoop.hbase.NamespaceDescriptor;
64 import org.apache.hadoop.hbase.ServerName;
65 import org.apache.hadoop.hbase.SplitLogCounters;
66 import org.apache.hadoop.hbase.Waiter;
67 import org.apache.hadoop.hbase.client.Delete;
68 import org.apache.hadoop.hbase.client.Get;
69 import org.apache.hadoop.hbase.client.HConnectionManager;
70 import org.apache.hadoop.hbase.client.HTable;
71 import org.apache.hadoop.hbase.client.Increment;
72 import org.apache.hadoop.hbase.client.NonceGenerator;
73 import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
74 import org.apache.hadoop.hbase.client.Put;
75 import org.apache.hadoop.hbase.client.Result;
76 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
77 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
78 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
79 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
80 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
81 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
82 import org.apache.hadoop.hbase.regionserver.HRegion;
83 import org.apache.hadoop.hbase.regionserver.HRegionServer;
84 import org.apache.hadoop.hbase.regionserver.wal.HLog;
85 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
86 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
87 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
88 import org.apache.hadoop.hbase.util.Bytes;
89 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
90 import org.apache.hadoop.hbase.util.FSUtils;
91 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
92 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
93 import org.apache.hadoop.hbase.util.Threads;
94 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
95 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
96 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
97 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
98 import org.apache.hadoop.hdfs.MiniDFSCluster;
99 import org.apache.zookeeper.KeeperException;
100 import org.junit.After;
101 import org.junit.AfterClass;
102 import org.junit.Assert;
103 import org.junit.Before;
104 import org.junit.BeforeClass;
105 import org.junit.Test;
106 import org.junit.experimental.categories.Category;
107
108 @Category(LargeTests.class)
109 public class TestDistributedLogSplitting {
110 private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
111 static {
112
113
114
115
116
117
118 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
119
120 }
121
122
123 static final int NUM_MASTERS = 2;
124 static final int NUM_RS = 6;
125
126 MiniHBaseCluster cluster;
127 HMaster master;
128 Configuration conf;
129 static Configuration originalConf;
130 static HBaseTestingUtility TEST_UTIL;
131 static MiniDFSCluster dfsCluster;
132 static MiniZooKeeperCluster zkCluster;
133
134 @BeforeClass
135 public static void setup() throws Exception {
136 TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create());
137 dfsCluster = TEST_UTIL.startMiniDFSCluster(1);
138 zkCluster = TEST_UTIL.startMiniZKCluster();
139 originalConf = TEST_UTIL.getConfiguration();
140 }
141
142 @AfterClass
143 public static void tearDown() throws IOException {
144 TEST_UTIL.shutdownMiniZKCluster();
145 TEST_UTIL.shutdownMiniDFSCluster();
146 }
147
148 private void startCluster(int num_rs) throws Exception {
149 SplitLogCounters.resetCounters();
150 LOG.info("Starting cluster");
151 conf.getLong("hbase.splitlog.max.resubmit", 0);
152
153 conf.setInt("zookeeper.recovery.retry", 0);
154 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
155 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0);
156 conf.setInt("hbase.regionserver.wal.max.splitters", 3);
157 conf.setInt("hfile.format.version", 3);
158 conf.setInt("hbase.master.maximum.ping.server.attempts", 3);
159 TEST_UTIL.shutdownMiniHBaseCluster();
160 TEST_UTIL = new HBaseTestingUtility(conf);
161 TEST_UTIL.setDFSCluster(dfsCluster);
162 TEST_UTIL.setZkCluster(zkCluster);
163 TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs);
164 cluster = TEST_UTIL.getHBaseCluster();
165 LOG.info("Waiting for active/ready master");
166 cluster.waitForActiveAndReadyMaster();
167 master = cluster.getMaster();
168 while (cluster.getLiveRegionServerThreads().size() < num_rs) {
169 Threads.sleep(1);
170 }
171 }
172
173 @Before
174 public void before() throws Exception {
175
176 conf = HBaseConfiguration.create(originalConf);
177 }
178
179 @After
180 public void after() throws Exception {
181 try {
182 if (TEST_UTIL.getHBaseCluster() != null) {
183 for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) {
184 mt.getMaster().abort("closing...", null);
185 }
186 }
187 TEST_UTIL.shutdownMiniHBaseCluster();
188 } finally {
189 TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
190 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
191 }
192 }
193
194 @Test (timeout=300000)
195 public void testRecoveredEdits() throws Exception {
196 LOG.info("testRecoveredEdits");
197 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024);
198 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
199 startCluster(NUM_RS);
200
201 final int NUM_LOG_LINES = 1000;
202 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
203
204
205 master.balanceSwitch(false);
206 FileSystem fs = master.getMasterFileSystem().getFileSystem();
207
208 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
209
210 Path rootdir = FSUtils.getRootDir(conf);
211
212 installTable(new ZooKeeperWatcher(conf, "table-creation", null),
213 "table", "family", 40);
214 TableName table = TableName.valueOf("table");
215 List<HRegionInfo> regions = null;
216 HRegionServer hrs = null;
217 for (int i = 0; i < NUM_RS; i++) {
218 boolean foundRs = false;
219 hrs = rsts.get(i).getRegionServer();
220 regions = ProtobufUtil.getOnlineRegions(hrs);
221 for (HRegionInfo region : regions) {
222 if (region.getTable().getNameAsString().equalsIgnoreCase("table")) {
223 foundRs = true;
224 break;
225 }
226 }
227 if (foundRs) break;
228 }
229 final Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(hrs
230 .getServerName().toString()));
231
232 LOG.info("#regions = " + regions.size());
233 Iterator<HRegionInfo> it = regions.iterator();
234 while (it.hasNext()) {
235 HRegionInfo region = it.next();
236 if (region.getTable().getNamespaceAsString()
237 .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
238 it.remove();
239 }
240 }
241 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
242
243 slm.splitLogDistributed(logDir);
244
245 int count = 0;
246 for (HRegionInfo hri : regions) {
247
248 Path tdir = FSUtils.getTableDir(rootdir, table);
249 @SuppressWarnings("deprecation")
250 Path editsdir =
251 HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
252 LOG.debug("checking edits dir " + editsdir);
253 FileStatus[] files = fs.listStatus(editsdir);
254 assertTrue(files.length > 1);
255 for (int i = 0; i < files.length; i++) {
256 int c = countHLog(files[i].getPath(), fs, conf);
257 count += c;
258 }
259 LOG.info(count + " edits in " + files.length + " recovered edits files.");
260 }
261
262
263 assertFalse(fs.exists(logDir));
264
265 assertEquals(NUM_LOG_LINES, count);
266 }
267
268 @Test(timeout = 300000)
269 public void testLogReplayWithNonMetaRSDown() throws Exception {
270 LOG.info("testLogReplayWithNonMetaRSDown");
271 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024);
272 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
273 startCluster(NUM_RS);
274 final int NUM_REGIONS_TO_CREATE = 40;
275 final int NUM_LOG_LINES = 1000;
276
277
278 master.balanceSwitch(false);
279
280 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
281 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
282
283 HRegionServer hrs = findRSToKill(false, "table");
284 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
285 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
286
287
288 this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
289 ht.close();
290 zkw.close();
291 }
292
293 private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator {
294 private boolean isDups = false;
295 private LinkedList<Long> nonces = new LinkedList<Long>();
296
297 public void startDups() {
298 isDups = true;
299 }
300
301 @Override
302 public long newNonce() {
303 long nonce = isDups ? nonces.removeFirst() : super.newNonce();
304 if (!isDups) {
305 nonces.add(nonce);
306 }
307 return nonce;
308 }
309 }
310
311 @Test(timeout = 300000)
312 public void testNonceRecovery() throws Exception {
313 LOG.info("testNonceRecovery");
314 final String TABLE_NAME = "table";
315 final String FAMILY_NAME = "family";
316 final int NUM_REGIONS_TO_CREATE = 40;
317
318 conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
319 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
320 startCluster(NUM_RS);
321 master.balanceSwitch(false);
322
323 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
324 HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
325 NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
326 NonceGenerator oldNg =
327 HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), ng);
328
329 try {
330 List<Increment> reqs = new ArrayList<Increment>();
331 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
332 HRegionServer hrs = rst.getRegionServer();
333 List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs);
334 for (HRegionInfo hri : hris) {
335 if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) {
336 byte[] key = hri.getStartKey();
337 if (key == null || key.length == 0) {
338 key = Bytes.copy(hri.getEndKey());
339 --(key[key.length - 1]);
340 }
341 Increment incr = new Increment(key);
342 incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1);
343 ht.increment(incr);
344 reqs.add(incr);
345 }
346 }
347 }
348
349 HRegionServer hrs = findRSToKill(false, "table");
350 abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
351 ng.startDups();
352 for (Increment incr : reqs) {
353 try {
354 ht.increment(incr);
355 fail("should have thrown");
356 } catch (OperationConflictException ope) {
357 LOG.debug("Caught as expected: " + ope.getMessage());
358 }
359 }
360 } finally {
361 HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), oldNg);
362 ht.close();
363 zkw.close();
364 }
365 }
366
367 @Test(timeout = 300000)
368 public void testLogReplayWithMetaRSDown() throws Exception {
369 LOG.info("testRecoveredEditsReplayWithMetaRSDown");
370 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
371 startCluster(NUM_RS);
372 final int NUM_REGIONS_TO_CREATE = 40;
373 final int NUM_LOG_LINES = 1000;
374
375
376 master.balanceSwitch(false);
377
378 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
379 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
380
381 HRegionServer hrs = findRSToKill(true, "table");
382 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
383 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
384
385 this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
386 ht.close();
387 zkw.close();
388 }
389
390 private void abortRSAndVerifyRecovery(HRegionServer hrs, HTable ht, final ZooKeeperWatcher zkw,
391 final int numRegions, final int numofLines) throws Exception {
392
393 abortRSAndWaitForRecovery(hrs, zkw, numRegions);
394 assertEquals(numofLines, TEST_UTIL.countRows(ht));
395 }
396
397 private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw,
398 final int numRegions) throws Exception {
399 final MiniHBaseCluster tmpCluster = this.cluster;
400
401
402 LOG.info("Aborting region server: " + hrs.getServerName());
403 hrs.abort("testing");
404
405
406 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
407 @Override
408 public boolean evaluate() throws Exception {
409 return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
410 }
411 });
412
413
414 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
415 @Override
416 public boolean evaluate() throws Exception {
417 return (getAllOnlineRegions(tmpCluster).size() >= (numRegions + 1));
418 }
419 });
420
421
422 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
423 @Override
424 public boolean evaluate() throws Exception {
425 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
426 zkw.recoveringRegionsZNode, false);
427 return (recoveringRegions != null && recoveringRegions.size() == 0);
428 }
429 });
430 }
431
432 @Test(timeout = 300000)
433 public void testMasterStartsUpWithLogSplittingWork() throws Exception {
434 LOG.info("testMasterStartsUpWithLogSplittingWork");
435 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
436 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
437 startCluster(NUM_RS);
438
439 final int NUM_REGIONS_TO_CREATE = 40;
440 final int NUM_LOG_LINES = 1000;
441
442
443 master.balanceSwitch(false);
444
445 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
446 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
447 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
448
449 HRegionServer hrs = findRSToKill(false, "table");
450 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
451 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
452
453
454 abortMaster(cluster);
455
456
457 LOG.info("Aborting region server: " + hrs.getServerName());
458 hrs.abort("testing");
459
460
461 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
462 @Override
463 public boolean evaluate() throws Exception {
464 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
465 }
466 });
467
468 Thread.sleep(2000);
469 LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
470
471 startMasterAndWaitUntilLogSplit(cluster);
472
473
474 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
475 @Override
476 public boolean evaluate() throws Exception {
477 return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
478 }
479 });
480
481 LOG.info("Current Open Regions After Master Node Starts Up:"
482 + getAllOnlineRegions(cluster).size());
483
484 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
485
486 ht.close();
487 zkw.close();
488 }
489
490 @Test(timeout = 300000)
491 public void testMasterStartsUpWithLogReplayWork() throws Exception {
492 LOG.info("testMasterStartsUpWithLogReplayWork");
493 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
494 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
495 startCluster(NUM_RS);
496
497 final int NUM_REGIONS_TO_CREATE = 40;
498 final int NUM_LOG_LINES = 1000;
499
500
501 master.balanceSwitch(false);
502
503 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
504 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
505 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
506
507 HRegionServer hrs = findRSToKill(false, "table");
508 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
509 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
510
511
512 abortMaster(cluster);
513
514
515 LOG.info("Aborting region server: " + hrs.getServerName());
516 hrs.abort("testing");
517
518
519 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
520 @Override
521 public boolean evaluate() throws Exception {
522 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
523 }
524 });
525
526 Thread.sleep(2000);
527 LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
528
529 startMasterAndWaitUntilLogSplit(cluster);
530
531
532 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
533 @Override
534 public boolean evaluate() throws Exception {
535 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
536 zkw.recoveringRegionsZNode, false);
537 return (recoveringRegions != null && recoveringRegions.size() == 0);
538 }
539 });
540
541 LOG.info("Current Open Regions After Master Node Starts Up:"
542 + getAllOnlineRegions(cluster).size());
543
544 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
545
546 ht.close();
547 zkw.close();
548 }
549
550
551 @Test(timeout = 300000)
552 public void testLogReplayTwoSequentialRSDown() throws Exception {
553 LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
554 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
555 startCluster(NUM_RS);
556 final int NUM_REGIONS_TO_CREATE = 40;
557 final int NUM_LOG_LINES = 1000;
558
559
560 master.balanceSwitch(false);
561
562 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
563 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
564 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
565
566 List<HRegionInfo> regions = null;
567 HRegionServer hrs1 = findRSToKill(false, "table");
568 regions = ProtobufUtil.getOnlineRegions(hrs1);
569
570 makeHLog(hrs1.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
571
572
573 LOG.info("Aborting region server: " + hrs1.getServerName());
574 hrs1.abort("testing");
575
576
577 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
578 @Override
579 public boolean evaluate() throws Exception {
580 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
581 }
582 });
583
584
585 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
586 @Override
587 public boolean evaluate() throws Exception {
588 return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
589 }
590 });
591
592
593 Thread.sleep(300);
594
595 rsts = cluster.getLiveRegionServerThreads();
596 HRegionServer hrs2 = rsts.get(0).getRegionServer();
597 LOG.info("Aborting one more region server: " + hrs2.getServerName());
598 hrs2.abort("testing");
599
600
601 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
602 @Override
603 public boolean evaluate() throws Exception {
604 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2));
605 }
606 });
607
608
609 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
610 @Override
611 public boolean evaluate() throws Exception {
612 return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
613 }
614 });
615
616
617 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
618 @Override
619 public boolean evaluate() throws Exception {
620 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
621 zkw.recoveringRegionsZNode, false);
622 return (recoveringRegions != null && recoveringRegions.size() == 0);
623 }
624 });
625
626 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
627 ht.close();
628 zkw.close();
629 }
630
631 @Test(timeout = 300000)
632 public void testMarkRegionsRecoveringInZK() throws Exception {
633 LOG.info("testMarkRegionsRecoveringInZK");
634 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
635 startCluster(NUM_RS);
636 master.balanceSwitch(false);
637 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
638 final ZooKeeperWatcher zkw = master.getZooKeeperWatcher();
639 HTable ht = installTable(zkw, "table", "family", 40);
640 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
641
642 Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
643 HRegionInfo region = null;
644 HRegionServer hrs = null;
645 ServerName firstFailedServer = null;
646 ServerName secondFailedServer = null;
647 for (int i = 0; i < NUM_RS; i++) {
648 hrs = rsts.get(i).getRegionServer();
649 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
650 if (regions.isEmpty()) continue;
651 region = regions.get(0);
652 regionSet.add(region);
653 firstFailedServer = hrs.getServerName();
654 secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName();
655 break;
656 }
657
658 slm.markRegionsRecoveringInZK(firstFailedServer, regionSet);
659 slm.markRegionsRecoveringInZK(secondFailedServer, regionSet);
660
661 List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
662 ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
663
664 assertEquals(recoveringRegions.size(), 2);
665
666
667 final HRegionServer tmphrs = hrs;
668 TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
669 @Override
670 public boolean evaluate() throws Exception {
671 return (tmphrs.getRecoveringRegions().size() == 0);
672 }
673 });
674 ht.close();
675 zkw.close();
676 }
677
678 @Test(timeout = 300000)
679 public void testReplayCmd() throws Exception {
680 LOG.info("testReplayCmd");
681 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
682 startCluster(NUM_RS);
683 final int NUM_REGIONS_TO_CREATE = 40;
684
685
686 master.balanceSwitch(false);
687
688 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
689 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
690 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
691
692 List<HRegionInfo> regions = null;
693 HRegionServer hrs = null;
694 for (int i = 0; i < NUM_RS; i++) {
695 boolean isCarryingMeta = false;
696 hrs = rsts.get(i).getRegionServer();
697 regions = ProtobufUtil.getOnlineRegions(hrs);
698 for (HRegionInfo region : regions) {
699 if (region.isMetaRegion()) {
700 isCarryingMeta = true;
701 break;
702 }
703 }
704 if (isCarryingMeta) {
705 continue;
706 }
707 if (regions.size() > 0) break;
708 }
709
710 this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
711 String originalCheckSum = TEST_UTIL.checksumRows(ht);
712
713
714 abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
715
716 assertEquals("Data should remain after reopening of regions", originalCheckSum,
717 TEST_UTIL.checksumRows(ht));
718
719 ht.close();
720 zkw.close();
721 }
722
723 @Test(timeout = 300000)
724 public void testLogReplayForDisablingTable() throws Exception {
725 LOG.info("testLogReplayForDisablingTable");
726 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
727 startCluster(NUM_RS);
728 final int NUM_REGIONS_TO_CREATE = 40;
729 final int NUM_LOG_LINES = 1000;
730
731 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
732 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
733 HTable disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE);
734 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE);
735
736
737
738 master.balanceSwitch(false);
739
740 List<HRegionInfo> regions = null;
741 HRegionServer hrs = null;
742 boolean hasRegionsForBothTables = false;
743 String tableName = null;
744 for (int i = 0; i < NUM_RS; i++) {
745 tableName = null;
746 hasRegionsForBothTables = false;
747 boolean isCarryingSystem = false;
748 hrs = rsts.get(i).getRegionServer();
749 regions = ProtobufUtil.getOnlineRegions(hrs);
750 for (HRegionInfo region : regions) {
751 if (region.getTable().isSystemTable()) {
752 isCarryingSystem = true;
753 break;
754 }
755 if (tableName != null &&
756 !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) {
757
758 hasRegionsForBothTables = true;
759 break;
760 } else if (tableName == null) {
761 tableName = region.getTable().getNameAsString();
762 }
763 }
764 if (isCarryingSystem) {
765 continue;
766 }
767 if (hasRegionsForBothTables) {
768 break;
769 }
770 }
771
772
773 Assert.assertTrue(hasRegionsForBothTables);
774
775 LOG.info("#regions = " + regions.size());
776 Iterator<HRegionInfo> it = regions.iterator();
777 while (it.hasNext()) {
778 HRegionInfo region = it.next();
779 if (region.isMetaTable()) {
780 it.remove();
781 }
782 }
783 makeHLog(hrs.getWAL(), regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
784 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
785
786 LOG.info("Disabling table\n");
787 TEST_UTIL.getHBaseAdmin().disableTable(Bytes.toBytes("disableTable"));
788
789
790 LOG.info("Aborting region server: " + hrs.getServerName());
791 hrs.abort("testing");
792
793
794 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
795 @Override
796 public boolean evaluate() throws Exception {
797 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
798 }
799 });
800
801
802 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
803 @Override
804 public boolean evaluate() throws Exception {
805 return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
806 }
807 });
808
809
810 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
811 @Override
812 public boolean evaluate() throws Exception {
813 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
814 zkw.recoveringRegionsZNode, false);
815 ServerManager serverManager = master.getServerManager();
816 return (!serverManager.areDeadServersInProgress() &&
817 recoveringRegions != null && recoveringRegions.size() == 0);
818 }
819 });
820
821 int count = 0;
822 FileSystem fs = master.getMasterFileSystem().getFileSystem();
823 Path rootdir = FSUtils.getRootDir(conf);
824 Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable"));
825 for (HRegionInfo hri : regions) {
826 @SuppressWarnings("deprecation")
827 Path editsdir =
828 HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
829 LOG.debug("checking edits dir " + editsdir);
830 if(!fs.exists(editsdir)) continue;
831 FileStatus[] files = fs.listStatus(editsdir);
832 if(files != null) {
833 for(FileStatus file : files) {
834 int c = countHLog(file.getPath(), fs, conf);
835 count += c;
836 LOG.info(c + " edits in " + file.getPath());
837 }
838 }
839 }
840
841 LOG.info("Verify edits in recovered.edits files");
842 assertEquals(NUM_LOG_LINES, count);
843 LOG.info("Verify replayed edits");
844 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
845
846
847 for (HRegionInfo hri : regions) {
848 @SuppressWarnings("deprecation")
849 Path editsdir =
850 HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
851 fs.delete(editsdir, true);
852 }
853 disablingHT.close();
854 ht.close();
855 zkw.close();
856 }
857
858 @Test(timeout = 300000)
859 public void testDisallowWritesInRecovering() throws Exception {
860 LOG.info("testDisallowWritesInRecovering");
861 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
862 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
863 conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
864 startCluster(NUM_RS);
865 final int NUM_REGIONS_TO_CREATE = 40;
866
867
868 master.balanceSwitch(false);
869
870 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
871 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
872 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
873 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
874
875 Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
876 HRegionInfo region = null;
877 HRegionServer hrs = null;
878 HRegionServer dstRS = null;
879 for (int i = 0; i < NUM_RS; i++) {
880 hrs = rsts.get(i).getRegionServer();
881 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
882 if (regions.isEmpty()) continue;
883 region = regions.get(0);
884 regionSet.add(region);
885 dstRS = rsts.get((i+1) % NUM_RS).getRegionServer();
886 break;
887 }
888
889 slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet);
890
891 final HRegionInfo hri = region;
892 final HRegionServer tmpRS = dstRS;
893 TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(),
894 Bytes.toBytes(dstRS.getServerName().getServerName()));
895
896 final RegionStates regionStates =
897 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
898 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
899 @Override
900 public boolean evaluate() throws Exception {
901 ServerName sn = regionStates.getRegionServerOfRegion(hri);
902 return (sn != null && sn.equals(tmpRS.getServerName()));
903 }
904 });
905
906 try {
907 byte[] key = region.getStartKey();
908 if (key == null || key.length == 0) {
909 key = new byte[] { 0, 0, 0, 0, 1 };
910 }
911 ht.setAutoFlush(true, true);
912 Put put = new Put(key);
913 put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
914 ht.put(put);
915 ht.close();
916 } catch (IOException ioe) {
917 Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
918 RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
919 boolean foundRegionInRecoveryException = false;
920 for (Throwable t : re.getCauses()) {
921 if (t instanceof RegionInRecoveryException) {
922 foundRegionInRecoveryException = true;
923 break;
924 }
925 }
926 Assert.assertTrue(
927 "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(),
928 foundRegionInRecoveryException);
929 }
930
931 zkw.close();
932 }
933
934
935
936
937
938
939
940
941
942
943 @Test (timeout=300000)
944 public void testWorkerAbort() throws Exception {
945 LOG.info("testWorkerAbort");
946 startCluster(3);
947 final int NUM_LOG_LINES = 10000;
948 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
949 FileSystem fs = master.getMasterFileSystem().getFileSystem();
950
951 final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
952 HRegionServer hrs = findRSToKill(false, "table");
953 Path rootdir = FSUtils.getRootDir(conf);
954 final Path logDir = new Path(rootdir,
955 HLogUtil.getHLogDirectoryName(hrs.getServerName().toString()));
956
957 installTable(new ZooKeeperWatcher(conf, "table-creation", null),
958 "table", "family", 40);
959
960 makeHLog(hrs.getWAL(), ProtobufUtil.getOnlineRegions(hrs), "table", "family", NUM_LOG_LINES,
961 100);
962
963 new Thread() {
964 public void run() {
965 waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
966 for (RegionServerThread rst : rsts) {
967 rst.getRegionServer().abort("testing");
968 break;
969 }
970 }
971 }.start();
972
973 FileStatus[] logfiles = fs.listStatus(logDir);
974 TaskBatch batch = new TaskBatch();
975 slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
976
977 long curt = System.currentTimeMillis();
978 long waitTime = 80000;
979 long endt = curt + waitTime;
980 while (curt < endt) {
981 if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
982 tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
983 tot_wkr_preempt_task.get()) == 0) {
984 Thread.yield();
985 curt = System.currentTimeMillis();
986 } else {
987 assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
988 tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
989 tot_wkr_preempt_task.get()));
990 return;
991 }
992 }
993 fail("none of the following counters went up in " + waitTime +
994 " milliseconds - " +
995 "tot_wkr_task_resigned, tot_wkr_task_err, " +
996 "tot_wkr_final_transition_failed, tot_wkr_task_done, " +
997 "tot_wkr_preempt_task");
998 }
999
1000 @Test (timeout=300000)
1001 public void testThreeRSAbort() throws Exception {
1002 LOG.info("testThreeRSAbort");
1003 final int NUM_REGIONS_TO_CREATE = 40;
1004 final int NUM_ROWS_PER_REGION = 100;
1005
1006 startCluster(NUM_RS);
1007
1008 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
1009 "distributed log splitting test", null);
1010
1011 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1012 populateDataInTable(NUM_ROWS_PER_REGION, "family");
1013
1014
1015 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1016 assertEquals(NUM_RS, rsts.size());
1017 rsts.get(0).getRegionServer().abort("testing");
1018 rsts.get(1).getRegionServer().abort("testing");
1019 rsts.get(2).getRegionServer().abort("testing");
1020
1021 long start = EnvironmentEdgeManager.currentTimeMillis();
1022 while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
1023 if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
1024 assertTrue(false);
1025 }
1026 Thread.sleep(200);
1027 }
1028
1029 start = EnvironmentEdgeManager.currentTimeMillis();
1030 while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 1)) {
1031 if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
1032 assertTrue("Timedout", false);
1033 }
1034 Thread.sleep(200);
1035 }
1036
1037
1038 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
1039 @Override
1040 public boolean evaluate() throws Exception {
1041 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
1042 zkw.recoveringRegionsZNode, false);
1043 return (recoveringRegions != null && recoveringRegions.size() == 0);
1044 }
1045 });
1046
1047 assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
1048 TEST_UTIL.countRows(ht));
1049 ht.close();
1050 zkw.close();
1051 }
1052
1053
1054
1055 @Test(timeout=30000)
1056 public void testDelayedDeleteOnFailure() throws Exception {
1057 LOG.info("testDelayedDeleteOnFailure");
1058 startCluster(1);
1059 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
1060 final FileSystem fs = master.getMasterFileSystem().getFileSystem();
1061 final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
1062 fs.mkdirs(logDir);
1063 ExecutorService executor = null;
1064 try {
1065 final Path corruptedLogFile = new Path(logDir, "x");
1066 FSDataOutputStream out;
1067 out = fs.create(corruptedLogFile);
1068 out.write(0);
1069 out.write(Bytes.toBytes("corrupted bytes"));
1070 out.close();
1071 slm.ignoreZKDeleteForTesting = true;
1072 executor = Executors.newSingleThreadExecutor();
1073 Runnable runnable = new Runnable() {
1074 @Override
1075 public void run() {
1076 try {
1077
1078
1079
1080 slm.splitLogDistributed(logDir);
1081 } catch (IOException ioe) {
1082 try {
1083 assertTrue(fs.exists(corruptedLogFile));
1084
1085
1086
1087 slm.splitLogDistributed(logDir);
1088 } catch (IOException e) {
1089 assertTrue(Thread.currentThread().isInterrupted());
1090 return;
1091 }
1092 fail("did not get the expected IOException from the 2nd call");
1093 }
1094 fail("did not get the expected IOException from the 1st call");
1095 }
1096 };
1097 Future<?> result = executor.submit(runnable);
1098 try {
1099 result.get(2000, TimeUnit.MILLISECONDS);
1100 } catch (TimeoutException te) {
1101
1102 }
1103 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
1104 executor.shutdownNow();
1105 executor = null;
1106
1107
1108 result.get();
1109 } finally {
1110 if (executor != null) {
1111
1112
1113 executor.shutdownNow();
1114 }
1115 fs.delete(logDir, true);
1116 }
1117 }
1118
1119 @Test(timeout = 300000)
1120 public void testMetaRecoveryInZK() throws Exception {
1121 LOG.info("testMetaRecoveryInZK");
1122 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1123 startCluster(NUM_RS);
1124
1125
1126
1127 master.balanceSwitch(false);
1128 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1129 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1130
1131
1132 HRegionServer hrs = findRSToKill(true, null);
1133 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
1134
1135 LOG.info("#regions = " + regions.size());
1136 Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
1137 tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
1138 master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions);
1139 Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
1140 userRegionSet.addAll(regions);
1141 master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet);
1142 boolean isMetaRegionInRecovery = false;
1143 List<String> recoveringRegions =
1144 zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1145 for (String curEncodedRegionName : recoveringRegions) {
1146 if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1147 isMetaRegionInRecovery = true;
1148 break;
1149 }
1150 }
1151 assertTrue(isMetaRegionInRecovery);
1152
1153 master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
1154
1155 isMetaRegionInRecovery = false;
1156 recoveringRegions =
1157 zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1158 for (String curEncodedRegionName : recoveringRegions) {
1159 if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1160 isMetaRegionInRecovery = true;
1161 break;
1162 }
1163 }
1164
1165 assertFalse(isMetaRegionInRecovery);
1166 zkw.close();
1167 }
1168
1169 @Test(timeout = 300000)
1170 public void testSameVersionUpdatesRecovery() throws Exception {
1171 LOG.info("testSameVersionUpdatesRecovery");
1172 conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1173 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1174 startCluster(NUM_RS);
1175 final AtomicLong sequenceId = new AtomicLong(100);
1176 final int NUM_REGIONS_TO_CREATE = 40;
1177 final int NUM_LOG_LINES = 1000;
1178
1179
1180 master.balanceSwitch(false);
1181
1182 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1183 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1184 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1185
1186 List<HRegionInfo> regions = null;
1187 HRegionServer hrs = null;
1188 for (int i = 0; i < NUM_RS; i++) {
1189 boolean isCarryingMeta = false;
1190 hrs = rsts.get(i).getRegionServer();
1191 regions = ProtobufUtil.getOnlineRegions(hrs);
1192 for (HRegionInfo region : regions) {
1193 if (region.isMetaRegion()) {
1194 isCarryingMeta = true;
1195 break;
1196 }
1197 }
1198 if (isCarryingMeta) {
1199 continue;
1200 }
1201 break;
1202 }
1203
1204 LOG.info("#regions = " + regions.size());
1205 Iterator<HRegionInfo> it = regions.iterator();
1206 while (it.hasNext()) {
1207 HRegionInfo region = it.next();
1208 if (region.isMetaTable()
1209 || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1210 it.remove();
1211 }
1212 }
1213 if (regions.size() == 0) return;
1214 HRegionInfo curRegionInfo = regions.get(0);
1215 byte[] startRow = curRegionInfo.getStartKey();
1216 if (startRow == null || startRow.length == 0) {
1217 startRow = new byte[] { 0, 0, 0, 0, 1 };
1218 }
1219 byte[] row = Bytes.incrementBytes(startRow, 1);
1220
1221 row = Arrays.copyOfRange(row, 3, 8);
1222 long value = 0;
1223 byte[] tableName = Bytes.toBytes("table");
1224 byte[] family = Bytes.toBytes("family");
1225 byte[] qualifier = Bytes.toBytes("c1");
1226 long timeStamp = System.currentTimeMillis();
1227 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1228 htd.addFamily(new HColumnDescriptor(family));
1229 for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1230 WALEdit e = new WALEdit();
1231 value++;
1232 e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1233 hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e,
1234 System.currentTimeMillis(), htd, sequenceId);
1235 }
1236 hrs.getWAL().sync();
1237 hrs.getWAL().close();
1238
1239
1240 this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1241
1242
1243 LOG.info("Verification Starts...");
1244 Get g = new Get(row);
1245 Result r = ht.get(g);
1246 long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1247 assertEquals(value, theStoredVal);
1248
1249
1250 LOG.info("Verification after flush...");
1251 TEST_UTIL.getHBaseAdmin().flush(tableName);
1252 r = ht.get(g);
1253 theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1254 assertEquals(value, theStoredVal);
1255 ht.close();
1256 }
1257
1258 @Test(timeout = 300000)
1259 public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception {
1260 LOG.info("testSameVersionUpdatesRecoveryWithWrites");
1261 conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1262 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1263 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
1264 conf.setInt("hbase.hstore.compactionThreshold", 3);
1265 startCluster(NUM_RS);
1266 final AtomicLong sequenceId = new AtomicLong(100);
1267 final int NUM_REGIONS_TO_CREATE = 40;
1268 final int NUM_LOG_LINES = 1000;
1269
1270
1271 master.balanceSwitch(false);
1272
1273 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1274 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1275 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1276
1277 List<HRegionInfo> regions = null;
1278 HRegionServer hrs = null;
1279 for (int i = 0; i < NUM_RS; i++) {
1280 boolean isCarryingMeta = false;
1281 hrs = rsts.get(i).getRegionServer();
1282 regions = ProtobufUtil.getOnlineRegions(hrs);
1283 for (HRegionInfo region : regions) {
1284 if (region.isMetaRegion()) {
1285 isCarryingMeta = true;
1286 break;
1287 }
1288 }
1289 if (isCarryingMeta) {
1290 continue;
1291 }
1292 break;
1293 }
1294
1295 LOG.info("#regions = " + regions.size());
1296 Iterator<HRegionInfo> it = regions.iterator();
1297 while (it.hasNext()) {
1298 HRegionInfo region = it.next();
1299 if (region.isMetaTable()
1300 || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1301 it.remove();
1302 }
1303 }
1304 if (regions.size() == 0) return;
1305 HRegionInfo curRegionInfo = regions.get(0);
1306 byte[] startRow = curRegionInfo.getStartKey();
1307 if (startRow == null || startRow.length == 0) {
1308 startRow = new byte[] { 0, 0, 0, 0, 1 };
1309 }
1310 byte[] row = Bytes.incrementBytes(startRow, 1);
1311
1312 row = Arrays.copyOfRange(row, 3, 8);
1313 long value = 0;
1314 final byte[] tableName = Bytes.toBytes("table");
1315 byte[] family = Bytes.toBytes("family");
1316 byte[] qualifier = Bytes.toBytes("c1");
1317 long timeStamp = System.currentTimeMillis();
1318 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1319 htd.addFamily(new HColumnDescriptor(family));
1320 for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1321 WALEdit e = new WALEdit();
1322 value++;
1323 e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1324 hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e,
1325 System.currentTimeMillis(), htd, sequenceId);
1326 }
1327 hrs.getWAL().sync();
1328 hrs.getWAL().close();
1329
1330
1331 this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1332
1333
1334 LOG.info("Verification Starts...");
1335 Get g = new Get(row);
1336 Result r = ht.get(g);
1337 long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1338 assertEquals(value, theStoredVal);
1339
1340
1341 LOG.info("Verification after flush...");
1342 TEST_UTIL.getHBaseAdmin().flush(tableName);
1343 TEST_UTIL.getHBaseAdmin().compact(tableName);
1344
1345
1346 TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
1347 @Override
1348 public boolean evaluate() throws Exception {
1349 return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE);
1350 }
1351 });
1352
1353 r = ht.get(g);
1354 theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1355 assertEquals(value, theStoredVal);
1356 ht.close();
1357 }
1358
1359 HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception {
1360 return installTable(zkw, tname, fname, nrs, 0);
1361 }
1362
1363 HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs,
1364 int existingRegions) throws Exception {
1365
1366 byte [] table = Bytes.toBytes(tname);
1367 byte [] family = Bytes.toBytes(fname);
1368 LOG.info("Creating table with " + nrs + " regions");
1369 HTable ht = TEST_UTIL.createTable(table, family);
1370 int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, nrs);
1371 assertEquals(nrs, numRegions);
1372 LOG.info("Waiting for no more RIT\n");
1373 blockUntilNoRIT(zkw, master);
1374
1375
1376 LOG.debug("Disabling table\n");
1377 TEST_UTIL.getHBaseAdmin().disableTable(table);
1378 LOG.debug("Waiting for no more RIT\n");
1379 blockUntilNoRIT(zkw, master);
1380 NavigableSet<String> regions = getAllOnlineRegions(cluster);
1381 LOG.debug("Verifying only catalog and namespace regions are assigned\n");
1382 if (regions.size() != 2) {
1383 for (String oregion : regions)
1384 LOG.debug("Region still online: " + oregion);
1385 }
1386 assertEquals(2 + existingRegions, regions.size());
1387 LOG.debug("Enabling table\n");
1388 TEST_UTIL.getHBaseAdmin().enableTable(table);
1389 LOG.debug("Waiting for no more RIT\n");
1390 blockUntilNoRIT(zkw, master);
1391 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
1392 regions = getAllOnlineRegions(cluster);
1393 assertEquals(numRegions + 2 + existingRegions, regions.size());
1394 return ht;
1395 }
1396
1397 void populateDataInTable(int nrows, String fname) throws Exception {
1398 byte [] family = Bytes.toBytes(fname);
1399
1400 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1401 assertEquals(NUM_RS, rsts.size());
1402
1403 for (RegionServerThread rst : rsts) {
1404 HRegionServer hrs = rst.getRegionServer();
1405 List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs);
1406 for (HRegionInfo hri : hris) {
1407 if (hri.getTable().isSystemTable()) {
1408 continue;
1409 }
1410 LOG.debug("adding data to rs = " + rst.getName() +
1411 " region = "+ hri.getRegionNameAsString());
1412 HRegion region = hrs.getOnlineRegion(hri.getRegionName());
1413 assertTrue(region != null);
1414 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
1415 }
1416 }
1417 }
1418
1419 public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
1420 int num_edits, int edit_size) throws IOException {
1421 makeHLog(log, regions, tname, fname, num_edits, edit_size, true);
1422 }
1423
1424 public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
1425 int num_edits, int edit_size, boolean closeLog) throws IOException {
1426 TableName fullTName = TableName.valueOf(tname);
1427
1428 regions.remove(HRegionInfo.FIRST_META_REGIONINFO);
1429
1430 final AtomicLong sequenceId = new AtomicLong(10);
1431
1432
1433 for(Iterator<HRegionInfo> iter = regions.iterator(); iter.hasNext(); ) {
1434 HRegionInfo regionInfo = iter.next();
1435 if(regionInfo.getTable().isSystemTable()) {
1436 iter.remove();
1437 }
1438 }
1439 HTableDescriptor htd = new HTableDescriptor(fullTName);
1440 byte[] family = Bytes.toBytes(fname);
1441 htd.addFamily(new HColumnDescriptor(family));
1442 byte[] value = new byte[edit_size];
1443
1444 List<HRegionInfo> hris = new ArrayList<HRegionInfo>();
1445 for (HRegionInfo region : regions) {
1446 if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) {
1447 continue;
1448 }
1449 hris.add(region);
1450 }
1451 LOG.info("Creating wal edits across " + hris.size() + " regions.");
1452 for (int i = 0; i < edit_size; i++) {
1453 value[i] = (byte) ('a' + (i % 26));
1454 }
1455 int n = hris.size();
1456 int[] counts = new int[n];
1457 if (n > 0) {
1458 for (int i = 0; i < num_edits; i += 1) {
1459 WALEdit e = new WALEdit();
1460 HRegionInfo curRegionInfo = hris.get(i % n);
1461 byte[] startRow = curRegionInfo.getStartKey();
1462 if (startRow == null || startRow.length == 0) {
1463 startRow = new byte[] { 0, 0, 0, 0, 1 };
1464 }
1465 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
1466 row = Arrays.copyOfRange(row, 3, 8);
1467
1468
1469 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
1470 e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
1471 log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd, sequenceId);
1472 counts[i % n] += 1;
1473 }
1474 }
1475 log.sync();
1476 if(closeLog) {
1477 log.close();
1478 }
1479 for (int i = 0; i < n; i++) {
1480 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
1481 }
1482 return;
1483 }
1484
1485 private int countHLog(Path log, FileSystem fs, Configuration conf)
1486 throws IOException {
1487 int count = 0;
1488 HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1489 while (in.next() != null) {
1490 count++;
1491 }
1492 return count;
1493 }
1494
1495 private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
1496 throws KeeperException, InterruptedException {
1497 ZKAssign.blockUntilNoRIT(zkw);
1498 master.assignmentManager.waitUntilNoRegionsInTransition(60000);
1499 }
1500
1501 private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf,
1502 byte [] ...families)
1503 throws IOException {
1504 for(int i = 0; i < numRows; i++) {
1505 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
1506 for(byte [] family : families) {
1507 put.add(family, qf, null);
1508 }
1509 region.put(put);
1510 }
1511 }
1512
1513
1514
1515
1516 private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException {
1517 t.setAutoFlush(false, true);
1518 byte[] k = new byte[3];
1519
1520
1521 for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1522 for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1523 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1524 k[0] = b1;
1525 k[1] = b2;
1526 k[2] = b3;
1527 Put put = new Put(k);
1528 put.add(f, column, k);
1529 t.put(put);
1530 }
1531 }
1532 }
1533 t.flushCommits();
1534
1535 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1536 k[0] = 'a';
1537 k[1] = 'a';
1538 k[2] = b3;
1539 Delete del = new Delete(k);
1540 t.delete(del);
1541 }
1542 t.flushCommits();
1543 }
1544
1545 private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
1546 throws IOException {
1547 NavigableSet<String> online = new TreeSet<String>();
1548 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
1549 for (HRegionInfo region : ProtobufUtil.getOnlineRegions(rst.getRegionServer())) {
1550 online.add(region.getRegionNameAsString());
1551 }
1552 }
1553 return online;
1554 }
1555
1556 private void waitForCounter(AtomicLong ctr, long oldval, long newval,
1557 long timems) {
1558 long curt = System.currentTimeMillis();
1559 long endt = curt + timems;
1560 while (curt < endt) {
1561 if (ctr.get() == oldval) {
1562 Thread.yield();
1563 curt = System.currentTimeMillis();
1564 } else {
1565 assertEquals(newval, ctr.get());
1566 return;
1567 }
1568 }
1569 assertTrue(false);
1570 }
1571
1572 private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
1573 for (MasterThread mt : cluster.getLiveMasterThreads()) {
1574 if (mt.getMaster().isActiveMaster()) {
1575 mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
1576 mt.join();
1577 break;
1578 }
1579 }
1580 LOG.debug("Master is aborted");
1581 }
1582
1583 private void startMasterAndWaitUntilLogSplit(MiniHBaseCluster cluster)
1584 throws IOException, InterruptedException {
1585 cluster.startMaster();
1586 HMaster master = cluster.getMaster();
1587 while (!master.isInitialized()) {
1588 Thread.sleep(100);
1589 }
1590 ServerManager serverManager = master.getServerManager();
1591 while (serverManager.areDeadServersInProgress()) {
1592 Thread.sleep(100);
1593 }
1594 }
1595
1596
1597
1598
1599
1600
1601
1602
1603 private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception {
1604 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1605 int numOfRSs = rsts.size();
1606 List<HRegionInfo> regions = null;
1607 HRegionServer hrs = null;
1608
1609 for (int i = 0; i < numOfRSs; i++) {
1610 boolean isCarryingMeta = false;
1611 boolean foundTableRegion = false;
1612 hrs = rsts.get(i).getRegionServer();
1613 regions = ProtobufUtil.getOnlineRegions(hrs);
1614 for (HRegionInfo region : regions) {
1615 if (region.isMetaRegion()) {
1616 isCarryingMeta = true;
1617 }
1618 if (tableName == null || region.getTable().getNameAsString().equals(tableName)) {
1619 foundTableRegion = true;
1620 }
1621 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
1622 break;
1623 }
1624 }
1625 if (isCarryingMeta && hasMetaRegion) {
1626
1627 if (!foundTableRegion) {
1628 final HRegionServer destRS = hrs;
1629
1630 List<HRegionInfo> tableRegions =
1631 TEST_UTIL.getHBaseAdmin().getTableRegions(Bytes.toBytes(tableName));
1632 final HRegionInfo hri = tableRegions.get(0);
1633 TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
1634 Bytes.toBytes(destRS.getServerName().getServerName()));
1635
1636 final RegionStates regionStates =
1637 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
1638 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
1639 @Override
1640 public boolean evaluate() throws Exception {
1641 ServerName sn = regionStates.getRegionServerOfRegion(hri);
1642 return (sn != null && sn.equals(destRS.getServerName()));
1643 }
1644 });
1645 }
1646 return hrs;
1647 } else if (hasMetaRegion || isCarryingMeta) {
1648 continue;
1649 }
1650 if (foundTableRegion) break;
1651 }
1652
1653 return hrs;
1654 }
1655
1656 }