1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.lang.reflect.Constructor;
23 import java.lang.reflect.InvocationTargetException;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.hbase.master.HMaster;
32 import org.apache.hadoop.hbase.regionserver.HRegionServer;
33 import org.apache.hadoop.hbase.regionserver.ShutdownHook;
34 import org.apache.hadoop.util.ReflectionUtils;
35
36
37
38
39 @InterfaceAudience.Private
40 public class JVMClusterUtil {
41 private static final Log LOG = LogFactory.getLog(JVMClusterUtil.class);
42
43
44
45
46 public static class RegionServerThread extends Thread {
47 private final HRegionServer regionServer;
48
49 public RegionServerThread(final HRegionServer r, final int index) {
50 super(r, "RS:" + index + ";" + r.getServerName().toShortString());
51 this.regionServer = r;
52 }
53
54
55 public HRegionServer getRegionServer() {
56 return this.regionServer;
57 }
58
59
60
61
62
63 public void waitForServerOnline() {
64
65
66
67
68 regionServer.waitForServerOnline();
69 }
70 }
71
72
73
74
75
76
77
78
79
80
81 public static JVMClusterUtil.RegionServerThread createRegionServerThread(
82 final Configuration c, final Class<? extends HRegionServer> hrsc,
83 final int index)
84 throws IOException {
85 HRegionServer server;
86 try {
87 Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
88 ctor.setAccessible(true);
89 server = ctor.newInstance(c);
90 } catch (InvocationTargetException ite) {
91 Throwable target = ite.getTargetException();
92 throw new RuntimeException("Failed construction of RegionServer: " +
93 hrsc.toString() + ((target.getCause() != null)?
94 target.getCause().getMessage(): ""), target);
95 } catch (Exception e) {
96 IOException ioe = new IOException();
97 ioe.initCause(e);
98 throw ioe;
99 }
100 return new JVMClusterUtil.RegionServerThread(server, index);
101 }
102
103
104
105
106
107 public static class MasterThread extends Thread {
108 private final HMaster master;
109
110 public MasterThread(final HMaster m, final int index) {
111 super(m, "M:" + index + ";" + m.getServerName().toShortString());
112 this.master = m;
113 }
114
115
116 public HMaster getMaster() {
117 return this.master;
118 }
119 }
120
121
122
123
124
125
126
127
128
129
130 public static JVMClusterUtil.MasterThread createMasterThread(
131 final Configuration c, final Class<? extends HMaster> hmc,
132 final int index)
133 throws IOException {
134 HMaster server;
135 try {
136 server = hmc.getConstructor(Configuration.class).newInstance(c);
137 } catch (InvocationTargetException ite) {
138 Throwable target = ite.getTargetException();
139 throw new RuntimeException("Failed construction of Master: " +
140 hmc.toString() + ((target.getCause() != null)?
141 target.getCause().getMessage(): ""), target);
142 } catch (Exception e) {
143 IOException ioe = new IOException();
144 ioe.initCause(e);
145 throw ioe;
146 }
147 return new JVMClusterUtil.MasterThread(server, index);
148 }
149
150 private static JVMClusterUtil.MasterThread findActiveMaster(
151 List<JVMClusterUtil.MasterThread> masters) {
152 for (JVMClusterUtil.MasterThread t : masters) {
153 if (t.master.isActiveMaster()) {
154 return t;
155 }
156 }
157
158 return null;
159 }
160
161
162
163
164
165
166
167
168 public static String startup(final List<JVMClusterUtil.MasterThread> masters,
169 final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
170
171 if (masters == null || masters.isEmpty()) {
172 return null;
173 }
174
175 for (JVMClusterUtil.MasterThread t : masters) {
176 t.start();
177 }
178
179
180
181
182 long startTime = System.currentTimeMillis();
183 while (findActiveMaster(masters) == null) {
184 try {
185 Thread.sleep(100);
186 } catch (InterruptedException ignored) {
187 }
188 if (System.currentTimeMillis() > startTime + 30000) {
189 throw new RuntimeException("Master not active after 30 seconds");
190 }
191 }
192
193 if (regionservers != null) {
194 for (JVMClusterUtil.RegionServerThread t: regionservers) {
195 HRegionServer hrs = t.getRegionServer();
196 ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
197 .getConfiguration()), hrs, t);
198 t.start();
199 }
200 }
201
202
203
204 startTime = System.currentTimeMillis();
205 final int maxwait = 200000;
206 while (true) {
207 JVMClusterUtil.MasterThread t = findActiveMaster(masters);
208 if (t != null && t.master.isInitialized()) {
209 return t.master.getServerName().toString();
210 }
211
212 if (System.currentTimeMillis() > startTime + 10000) {
213
214 Threads.sleep(1000);
215 }
216 if (System.currentTimeMillis() > startTime + maxwait) {
217 String msg = "Master not initialized after " + maxwait + "ms seconds";
218 Threads.printThreadInfo(System.out,
219 "Thread dump because: " + msg);
220 throw new RuntimeException(msg);
221 }
222 try {
223 Thread.sleep(100);
224 } catch (InterruptedException ignored) {
225
226 }
227 }
228 }
229
230
231
232
233
234 public static void shutdown(final List<MasterThread> masters,
235 final List<RegionServerThread> regionservers) {
236 LOG.debug("Shutting down HBase Cluster");
237 if (masters != null) {
238
239 JVMClusterUtil.MasterThread activeMaster = null;
240 for (JVMClusterUtil.MasterThread t : masters) {
241 if (!t.master.isActiveMaster()) {
242 t.master.stopMaster();
243 } else {
244 activeMaster = t;
245 }
246 }
247
248 if (activeMaster != null)
249 activeMaster.master.shutdown();
250
251 }
252 boolean wasInterrupted = false;
253 final long maxTime = System.currentTimeMillis() + 30 * 1000;
254 if (regionservers != null) {
255
256 for (RegionServerThread t : regionservers) {
257 t.getRegionServer().stop("Shutdown requested");
258 }
259 for (RegionServerThread t : regionservers) {
260 long now = System.currentTimeMillis();
261 if (t.isAlive() && !wasInterrupted && now < maxTime) {
262 try {
263 t.join(maxTime - now);
264 } catch (InterruptedException e) {
265 LOG.info("Got InterruptedException on shutdown - " +
266 "not waiting anymore on region server ends", e);
267 wasInterrupted = true;
268 }
269 }
270 }
271
272
273 for (int i = 0; i < 100; ++i) {
274 boolean atLeastOneLiveServer = false;
275 for (RegionServerThread t : regionservers) {
276 if (t.isAlive()) {
277 atLeastOneLiveServer = true;
278 try {
279 LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
280 t.join(1000);
281 } catch (InterruptedException e) {
282 wasInterrupted = true;
283 }
284 }
285 }
286 if (!atLeastOneLiveServer) break;
287 for (RegionServerThread t : regionservers) {
288 if (t.isAlive()) {
289 LOG.warn("RegionServerThreads taking too long to stop, interrupting");
290 t.interrupt();
291 }
292 }
293 }
294 }
295
296 if (masters != null) {
297 for (JVMClusterUtil.MasterThread t : masters) {
298 while (t.master.isAlive() && !wasInterrupted) {
299 try {
300
301
302
303 Threads.threadDumpingIsAlive(t.master.getThread());
304 } catch(InterruptedException e) {
305 LOG.info("Got InterruptedException on shutdown - " +
306 "not waiting anymore on master ends", e);
307 wasInterrupted = true;
308 }
309 }
310 }
311 }
312 LOG.info("Shutdown of " +
313 ((masters != null) ? masters.size() : "0") + " master(s) and " +
314 ((regionservers != null) ? regionservers.size() : "0") +
315 " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
316
317 if (wasInterrupted){
318 Thread.currentThread().interrupt();
319 }
320 }
321 }