1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.security.PrivilegedExceptionAction;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.client.HBaseAdmin;
33 import org.apache.hadoop.hbase.regionserver.HRegionServer;
34 import org.apache.hadoop.hbase.security.User;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
37 import org.apache.hadoop.hbase.util.Threads;
38
39 import java.util.concurrent.CopyOnWriteArrayList;
40 import org.apache.hadoop.hbase.master.HMaster;
41 import org.apache.hadoop.hbase.util.JVMClusterUtil;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 @InterfaceAudience.Public
60 @InterfaceStability.Evolving
61 public class LocalHBaseCluster {
62 static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
63 private final List<JVMClusterUtil.MasterThread> masterThreads =
64 new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
65 private final List<JVMClusterUtil.RegionServerThread> regionThreads =
66 new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
67 private final static int DEFAULT_NO = 1;
68
69 public static final String LOCAL = "local";
70
71 public static final String LOCAL_COLON = LOCAL + ":";
72 private final Configuration conf;
73 private final Class<? extends HMaster> masterClass;
74 private final Class<? extends HRegionServer> regionServerClass;
75
76
77
78
79
80
81 public LocalHBaseCluster(final Configuration conf)
82 throws IOException {
83 this(conf, DEFAULT_NO);
84 }
85
86
87
88
89
90
91
92
93 public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
94 throws IOException {
95 this(conf, 1, noRegionServers, getMasterImplementation(conf),
96 getRegionServerImplementation(conf));
97 }
98
99
100
101
102
103
104
105
106
107 public LocalHBaseCluster(final Configuration conf, final int noMasters,
108 final int noRegionServers)
109 throws IOException {
110 this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
111 getRegionServerImplementation(conf));
112 }
113
114 @SuppressWarnings("unchecked")
115 private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
116 return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
117 HRegionServer.class);
118 }
119
120 @SuppressWarnings("unchecked")
121 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
122 return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
123 HMaster.class);
124 }
125
126
127
128
129
130
131
132
133
134
135
136 @SuppressWarnings("unchecked")
137 public LocalHBaseCluster(final Configuration conf, final int noMasters,
138 final int noRegionServers, final Class<? extends HMaster> masterClass,
139 final Class<? extends HRegionServer> regionServerClass)
140 throws IOException {
141 this.conf = conf;
142
143
144 conf.set(HConstants.MASTER_PORT, "0");
145 conf.set(HConstants.REGIONSERVER_PORT, "0");
146 conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
147
148 this.masterClass = (Class<? extends HMaster>)
149 conf.getClass(HConstants.MASTER_IMPL, masterClass);
150
151 for (int i = 0; i < noMasters; i++) {
152 addMaster(new Configuration(conf), i);
153 }
154
155 this.regionServerClass =
156 (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
157 regionServerClass);
158
159 for (int i = 0; i < noRegionServers; i++) {
160 addRegionServer(new Configuration(conf), i);
161 }
162 }
163
164 public JVMClusterUtil.RegionServerThread addRegionServer()
165 throws IOException {
166 return addRegionServer(new Configuration(conf), this.regionThreads.size());
167 }
168
169 @SuppressWarnings("unchecked")
170 public JVMClusterUtil.RegionServerThread addRegionServer(
171 Configuration config, final int index)
172 throws IOException {
173
174
175
176 JVMClusterUtil.RegionServerThread rst =
177 JVMClusterUtil.createRegionServerThread(config,
178 (Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL,
179 this.regionServerClass), index);
180 this.regionThreads.add(rst);
181 return rst;
182 }
183
184 public JVMClusterUtil.RegionServerThread addRegionServer(
185 final Configuration config, final int index, User user)
186 throws IOException, InterruptedException {
187 return user.runAs(
188 new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
189 public JVMClusterUtil.RegionServerThread run() throws Exception {
190 return addRegionServer(config, index);
191 }
192 });
193 }
194
195 public JVMClusterUtil.MasterThread addMaster() throws IOException {
196 return addMaster(new Configuration(conf), this.masterThreads.size());
197 }
198
199 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
200 throws IOException {
201
202
203
204 JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
205 (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
206 this.masterThreads.add(mt);
207 return mt;
208 }
209
210 public JVMClusterUtil.MasterThread addMaster(
211 final Configuration c, final int index, User user)
212 throws IOException, InterruptedException {
213 return user.runAs(
214 new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
215 public JVMClusterUtil.MasterThread run() throws Exception {
216 return addMaster(c, index);
217 }
218 });
219 }
220
221
222
223
224
225 public HRegionServer getRegionServer(int serverNumber) {
226 return regionThreads.get(serverNumber).getRegionServer();
227 }
228
229
230
231
232 public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
233 return Collections.unmodifiableList(this.regionThreads);
234 }
235
236
237
238
239
240
241 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
242 List<JVMClusterUtil.RegionServerThread> liveServers =
243 new ArrayList<JVMClusterUtil.RegionServerThread>();
244 List<RegionServerThread> list = getRegionServers();
245 for (JVMClusterUtil.RegionServerThread rst: list) {
246 if (rst.isAlive()) liveServers.add(rst);
247 else LOG.info("Not alive " + rst.getName());
248 }
249 return liveServers;
250 }
251
252
253
254
255 public Configuration getConfiguration() {
256 return this.conf;
257 }
258
259
260
261
262
263
264
265 public String waitOnRegionServer(int serverNumber) {
266 JVMClusterUtil.RegionServerThread regionServerThread =
267 this.regionThreads.remove(serverNumber);
268 while (regionServerThread.isAlive()) {
269 try {
270 LOG.info("Waiting on " +
271 regionServerThread.getRegionServer().toString());
272 regionServerThread.join();
273 } catch (InterruptedException e) {
274 e.printStackTrace();
275 }
276 }
277 return regionServerThread.getName();
278 }
279
280
281
282
283
284
285
286 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
287 while (rst.isAlive()) {
288 try {
289 LOG.info("Waiting on " +
290 rst.getRegionServer().toString());
291 rst.join();
292 } catch (InterruptedException e) {
293 e.printStackTrace();
294 }
295 }
296 for (int i=0;i<regionThreads.size();i++) {
297 if (regionThreads.get(i) == rst) {
298 regionThreads.remove(i);
299 break;
300 }
301 }
302 return rst.getName();
303 }
304
305
306
307
308
309 public HMaster getMaster(int serverNumber) {
310 return masterThreads.get(serverNumber).getMaster();
311 }
312
313
314
315
316
317
318 public HMaster getActiveMaster() {
319 for (JVMClusterUtil.MasterThread mt : masterThreads) {
320 if (mt.getMaster().isActiveMaster()) {
321
322
323 if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
324 return mt.getMaster();
325 }
326 }
327 }
328 return null;
329 }
330
331
332
333
334 public List<JVMClusterUtil.MasterThread> getMasters() {
335 return Collections.unmodifiableList(this.masterThreads);
336 }
337
338
339
340
341
342
343 public List<JVMClusterUtil.MasterThread> getLiveMasters() {
344 List<JVMClusterUtil.MasterThread> liveServers =
345 new ArrayList<JVMClusterUtil.MasterThread>();
346 List<JVMClusterUtil.MasterThread> list = getMasters();
347 for (JVMClusterUtil.MasterThread mt: list) {
348 if (mt.isAlive()) {
349 liveServers.add(mt);
350 }
351 }
352 return liveServers;
353 }
354
355
356
357
358
359
360
361 public String waitOnMaster(int serverNumber) {
362 JVMClusterUtil.MasterThread masterThread =
363 this.masterThreads.remove(serverNumber);
364 while (masterThread.isAlive()) {
365 try {
366 LOG.info("Waiting on " +
367 masterThread.getMaster().getServerName().toString());
368 masterThread.join();
369 } catch (InterruptedException e) {
370 e.printStackTrace();
371 }
372 }
373 return masterThread.getName();
374 }
375
376
377
378
379
380
381
382 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
383 while (masterThread.isAlive()) {
384 try {
385 LOG.info("Waiting on " +
386 masterThread.getMaster().getServerName().toString());
387 masterThread.join();
388 } catch (InterruptedException e) {
389 e.printStackTrace();
390 }
391 }
392 for (int i=0;i<masterThreads.size();i++) {
393 if (masterThreads.get(i) == masterThread) {
394 masterThreads.remove(i);
395 break;
396 }
397 }
398 return masterThread.getName();
399 }
400
401
402
403
404
405 public void join() {
406 if (this.regionThreads != null) {
407 for(Thread t: this.regionThreads) {
408 if (t.isAlive()) {
409 try {
410 Threads.threadDumpingIsAlive(t);
411 } catch (InterruptedException e) {
412 LOG.debug("Interrupted", e);
413 }
414 }
415 }
416 }
417 if (this.masterThreads != null) {
418 for (Thread t : this.masterThreads) {
419 if (t.isAlive()) {
420 try {
421 Threads.threadDumpingIsAlive(t);
422 } catch (InterruptedException e) {
423 LOG.debug("Interrupted", e);
424 }
425 }
426 }
427 }
428 }
429
430
431
432
433 public void startup() throws IOException {
434 JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
435 }
436
437
438
439
440 public void shutdown() {
441 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
442 }
443
444
445
446
447
448 public static boolean isLocal(final Configuration c) {
449 boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
450 return(mode == HConstants.CLUSTER_IS_LOCAL);
451 }
452
453
454
455
456
457
458 public static void main(String[] args) throws IOException {
459 Configuration conf = HBaseConfiguration.create();
460 LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
461 cluster.startup();
462 HBaseAdmin admin = new HBaseAdmin(conf);
463 HTableDescriptor htd =
464 new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
465 admin.createTable(htd);
466 cluster.shutdown();
467 }
468 }