1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.hbase.HBaseTestingUtility;
25 import org.apache.hadoop.hbase.testclassification.LargeTests;
26 import org.apache.hadoop.hbase.UnknownScannerException;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.client.ResultScanner;
29 import org.apache.hadoop.hbase.client.Scan;
30 import org.junit.experimental.categories.Category;
31
32 import static org.junit.Assert.fail;
33
34 @Category(LargeTests.class)
35 public class TestReplicationKillRS extends TestReplicationBase {
36
37 private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class);
38
39
40
41
42
43
44
45
46
47 public void loadTableAndKillRS(HBaseTestingUtility util) throws Exception {
48
49
50 int rsToKill1 =
51 util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
52
53
54 Thread killer = killARegionServer(util, 5000, rsToKill1);
55
56 LOG.info("Start loading table");
57 int initialCount = utility1.loadTable(htable1, famName);
58 LOG.info("Done loading table");
59 killer.join(5000);
60 LOG.info("Done waiting for threads");
61
62 Result[] res;
63 while (true) {
64 try {
65 Scan scan = new Scan();
66 ResultScanner scanner = htable1.getScanner(scan);
67 res = scanner.next(initialCount);
68 scanner.close();
69 break;
70 } catch (UnknownScannerException ex) {
71 LOG.info("Cluster wasn't ready yet, restarting scanner");
72 }
73 }
74
75
76 if (res.length != initialCount) {
77 LOG.warn("We lost some rows on the master cluster!");
78
79 initialCount = res.length;
80 }
81
82 int lastCount = 0;
83
84 final long start = System.currentTimeMillis();
85 int i = 0;
86 while (true) {
87 if (i==NB_RETRIES-1) {
88 fail("Waited too much time for queueFailover replication. " +
89 "Waited "+(System.currentTimeMillis() - start)+"ms.");
90 }
91 Scan scan2 = new Scan();
92 ResultScanner scanner2 = htable2.getScanner(scan2);
93 Result[] res2 = scanner2.next(initialCount * 2);
94 scanner2.close();
95 if (res2.length < initialCount) {
96 if (lastCount < res2.length) {
97 i--;
98 } else {
99 i++;
100 }
101 lastCount = res2.length;
102 LOG.info("Only got " + lastCount + " rows instead of " +
103 initialCount + " current i=" + i);
104 Thread.sleep(SLEEP_TIME*2);
105 } else {
106 break;
107 }
108 }
109 }
110
111 private static Thread killARegionServer(final HBaseTestingUtility utility,
112 final long timeout, final int rs) {
113 Thread killer = new Thread() {
114 public void run() {
115 try {
116 Thread.sleep(timeout);
117 utility.getHBaseCluster().getRegionServer(rs).stop("Stopping as part of the test");
118 } catch (Exception e) {
119 LOG.error("Couldn't kill a region server", e);
120 }
121 }
122 };
123 killer.setDaemon(true);
124 killer.start();
125 return killer;
126 }
127 }