1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.LocalHBaseCluster;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
33 import org.apache.hadoop.hbase.master.HMaster;
34 import org.apache.hadoop.hbase.master.ServerManager;
35 import org.apache.hadoop.hbase.testclassification.MediumTests;
36 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
37 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
38 import org.apache.zookeeper.KeeperException;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.junit.experimental.categories.Category;
43
44 @Category(MediumTests.class)
45 public class TestRegionServerReportForDuty {
46
47 private static final Log LOG = LogFactory.getLog(TestRegionServerReportForDuty.class);
48
49 private static final long SLEEP_INTERVAL = 500;
50
51 private HBaseTestingUtility testUtil;
52 private LocalHBaseCluster cluster;
53 private RegionServerThread rs;
54 private RegionServerThread rs2;
55 private MasterThread master;
56 private MasterThread backupMaster;
57
58 @Before
59 public void setUp() throws Exception {
60 testUtil = new HBaseTestingUtility();
61 testUtil.startMiniDFSCluster(1);
62 testUtil.startMiniZKCluster(1);
63 testUtil.createRootDir();
64 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
65 }
66
67 @After
68 public void tearDown() throws Exception {
69 cluster.shutdown();
70 cluster.join();
71 testUtil.shutdownMiniZKCluster();
72 testUtil.shutdownMiniDFSCluster();
73 }
74
75
76
77
78
79 @Test (timeout=180000)
80 public void testReportForDutyWithMasterChange() throws Exception {
81
82
83
84 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
85 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
86 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
87 master = cluster.addMaster();
88 rs = cluster.addRegionServer();
89 LOG.debug("Starting master: " + master.getMaster().getServerName());
90 master.start();
91 rs.start();
92
93
94 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
95 rs2 = cluster.addRegionServer();
96
97
98
99 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
100 rs2.start();
101
102 waitForClusterOnline(master);
103
104
105 master.getMaster().stop("Stopping master");
106
107
108
109 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
110 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
111 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
112 backupMaster = cluster.addMaster();
113 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
114 backupMaster.start();
115
116 waitForClusterOnline(backupMaster);
117
118
119 assertTrue(backupMaster.getMaster().isActiveMaster());
120 assertTrue(backupMaster.getMaster().isInitialized());
121 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2);
122
123 }
124
125 private void waitForClusterOnline(MasterThread master) throws InterruptedException {
126 while (true) {
127 if (master.getMaster().isInitialized()
128 && ((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
129 break;
130 }
131 Thread.sleep(SLEEP_INTERVAL);
132 LOG.debug("Waiting for master to come online ...");
133 }
134 rs.waitForServerOnline();
135 }
136
137
138
139
140
141
142 public static class MyRegionServer extends MiniHBaseClusterRegionServer {
143
144 private ServerName sn;
145
146
147 private boolean rpcStubCreatedFlag = false;
148 private boolean masterChanged = false;
149
150 public MyRegionServer(Configuration conf)
151 throws IOException, KeeperException,
152 InterruptedException {
153 super(conf);
154 }
155
156 @Override
157 protected synchronized ServerName createRegionServerStatusStub() {
158 sn = super.createRegionServerStatusStub();
159 rpcStubCreatedFlag = true;
160
161
162 while (!masterChanged) {
163 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
164 if (newSn != null && !newSn.equals(sn)) {
165 masterChanged = true;
166 break;
167 }
168 try {
169 Thread.sleep(SLEEP_INTERVAL);
170 } catch (InterruptedException e) {
171 }
172 LOG.debug("Waiting for master switch over ... ");
173 }
174 return sn;
175 }
176
177 public boolean getRpcStubCreatedFlag() {
178 return rpcStubCreatedFlag;
179 }
180 }
181 }