1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.security.PrivilegedAction;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.hbase.client.HConnectionManager;
33 import org.apache.hadoop.hbase.master.HMaster;
34 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
35 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
36 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
37 import org.apache.hadoop.hbase.regionserver.HRegion;
38 import org.apache.hadoop.hbase.regionserver.HRegionServer;
39 import org.apache.hadoop.hbase.security.User;
40 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
41 import org.apache.hadoop.hbase.util.JVMClusterUtil;
42 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
43 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
44 import org.apache.hadoop.hbase.util.Threads;
45
46
47
48
49
50
51
52 @InterfaceAudience.Public
53 @InterfaceStability.Evolving
54 public class MiniHBaseCluster extends HBaseCluster {
55 static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
56 public LocalHBaseCluster hbaseCluster;
57 private static int index;
58
59
60
61
62
63
64
65 public MiniHBaseCluster(Configuration conf, int numRegionServers)
66 throws IOException, InterruptedException {
67 this(conf, 1, numRegionServers);
68 }
69
70
71
72
73
74
75
76
77 public MiniHBaseCluster(Configuration conf, int numMasters,
78 int numRegionServers)
79 throws IOException, InterruptedException {
80 this(conf, numMasters, numRegionServers, null, null);
81 }
82
83 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
84 Class<? extends HMaster> masterClass,
85 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
86 throws IOException, InterruptedException {
87 super(conf);
88 conf.set(HConstants.MASTER_PORT, "0");
89
90
91 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
92
93 init(numMasters, numRegionServers, masterClass, regionserverClass);
94 this.initialClusterStatus = getClusterStatus();
95 }
96
97 public Configuration getConfiguration() {
98 return this.conf;
99 }
100
101
102
103
104
105
106
107 public static class MiniHBaseClusterRegionServer extends HRegionServer {
108 private Thread shutdownThread = null;
109 private User user = null;
110 public static boolean TEST_SKIP_CLOSE = false;
111
112 public MiniHBaseClusterRegionServer(Configuration conf)
113 throws IOException, InterruptedException {
114 super(conf);
115 this.user = User.getCurrent();
116 }
117
118
119
120
121
122
123
124
125
126 @Override
127 protected void handleReportForDutyResponse(
128 final RegionServerStartupResponse c) throws IOException {
129 super.handleReportForDutyResponse(c);
130
131 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
132 }
133
134 @Override
135 public void run() {
136 try {
137 this.user.runAs(new PrivilegedAction<Object>(){
138 public Object run() {
139 runRegionServer();
140 return null;
141 }
142 });
143 } catch (Throwable t) {
144 LOG.error("Exception in run", t);
145 } finally {
146
147 if (this.shutdownThread != null) {
148 this.shutdownThread.start();
149 Threads.shutdown(this.shutdownThread, 30000);
150 }
151 }
152 }
153
154 private void runRegionServer() {
155 super.run();
156 }
157
158 @Override
159 public void kill() {
160 super.kill();
161 }
162
163 public void abort(final String reason, final Throwable cause) {
164 this.user.runAs(new PrivilegedAction<Object>() {
165 public Object run() {
166 abortRegionServer(reason, cause);
167 return null;
168 }
169 });
170 }
171
172 private void abortRegionServer(String reason, Throwable cause) {
173 super.abort(reason, cause);
174 }
175 }
176
177
178
179
180
181 static class SingleFileSystemShutdownThread extends Thread {
182 private final FileSystem fs;
183 SingleFileSystemShutdownThread(final FileSystem fs) {
184 super("Shutdown of " + fs);
185 this.fs = fs;
186 }
187 @Override
188 public void run() {
189 try {
190 LOG.info("Hook closing fs=" + this.fs);
191 this.fs.close();
192 } catch (NullPointerException npe) {
193 LOG.debug("Need to fix these: " + npe.toString());
194 } catch (IOException e) {
195 LOG.warn("Running hook", e);
196 }
197 }
198 }
199
200 private void init(final int nMasterNodes, final int nRegionNodes,
201 Class<? extends HMaster> masterClass,
202 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
203 throws IOException, InterruptedException {
204 try {
205 if (masterClass == null){
206 masterClass = HMaster.class;
207 }
208 if (regionserverClass == null){
209 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
210 }
211
212
213 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
214 masterClass, regionserverClass);
215
216
217 for (int i=0; i<nRegionNodes; i++) {
218 Configuration rsConf = HBaseConfiguration.create(conf);
219 User user = HBaseTestingUtility.getDifferentUser(rsConf,
220 ".hfs."+index++);
221 hbaseCluster.addRegionServer(rsConf, i, user);
222 }
223
224 hbaseCluster.startup();
225 } catch (IOException e) {
226 shutdown();
227 throw e;
228 } catch (Throwable t) {
229 LOG.error("Error starting cluster", t);
230 shutdown();
231 throw new IOException("Shutting down", t);
232 }
233 }
234
235 @Deprecated
236 public void startRegionServer(String hostname) throws IOException {
237 this.startRegionServer();
238 }
239
240 @Override
241 public void startRegionServer(String hostname, int port) throws IOException {
242 this.startRegionServer();
243 }
244
245 @Override
246 public void killRegionServer(ServerName serverName) throws IOException {
247 HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
248 if (server instanceof MiniHBaseClusterRegionServer) {
249 LOG.info("Killing " + server.toString());
250 ((MiniHBaseClusterRegionServer) server).kill();
251 } else {
252 abortRegionServer(getRegionServerIndex(serverName));
253 }
254 }
255
256 @Override
257 public void stopRegionServer(ServerName serverName) throws IOException {
258 stopRegionServer(getRegionServerIndex(serverName));
259 }
260
261 @Override
262 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
263
264 waitOnRegionServer(getRegionServerIndex(serverName));
265 }
266
267 @Override
268 public void startMaster(String hostname, int port) throws IOException {
269 this.startMaster();
270 }
271
272 @Override
273 public void killMaster(ServerName serverName) throws IOException {
274 abortMaster(getMasterIndex(serverName));
275 }
276
277 @Override
278 public void stopMaster(ServerName serverName) throws IOException {
279 stopMaster(getMasterIndex(serverName));
280 }
281
282 @Override
283 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
284
285 waitOnMaster(getMasterIndex(serverName));
286 }
287
288
289
290
291
292
293
294 public JVMClusterUtil.RegionServerThread startRegionServer()
295 throws IOException {
296 final Configuration newConf = HBaseConfiguration.create(conf);
297 User rsUser =
298 HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
299 JVMClusterUtil.RegionServerThread t = null;
300 try {
301 t = hbaseCluster.addRegionServer(
302 newConf, hbaseCluster.getRegionServers().size(), rsUser);
303 t.start();
304 t.waitForServerOnline();
305 } catch (InterruptedException ie) {
306 throw new IOException("Interrupted adding regionserver to cluster", ie);
307 }
308 return t;
309 }
310
311
312
313
314
315 public String abortRegionServer(int serverNumber) {
316 HRegionServer server = getRegionServer(serverNumber);
317 LOG.info("Aborting " + server.toString());
318 server.abort("Aborting for tests", new Exception("Trace info"));
319 return server.toString();
320 }
321
322
323
324
325
326
327
328 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
329 return stopRegionServer(serverNumber, true);
330 }
331
332
333
334
335
336
337
338
339
340
341
342 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
343 final boolean shutdownFS) {
344 JVMClusterUtil.RegionServerThread server =
345 hbaseCluster.getRegionServers().get(serverNumber);
346 LOG.info("Stopping " + server.toString());
347 server.getRegionServer().stop("Stopping rs " + serverNumber);
348 return server;
349 }
350
351
352
353
354
355
356
357 public String waitOnRegionServer(final int serverNumber) {
358 return this.hbaseCluster.waitOnRegionServer(serverNumber);
359 }
360
361
362
363
364
365
366
367
368 public JVMClusterUtil.MasterThread startMaster() throws IOException {
369 Configuration c = HBaseConfiguration.create(conf);
370 User user =
371 HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
372
373 JVMClusterUtil.MasterThread t = null;
374 try {
375 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
376 t.start();
377 } catch (InterruptedException ie) {
378 throw new IOException("Interrupted adding master to cluster", ie);
379 }
380 return t;
381 }
382
383
384
385
386
387 public HMaster getMaster() {
388 return this.hbaseCluster.getActiveMaster();
389 }
390
391
392
393
394
395 public HMaster getMaster(final int serverNumber) {
396 return this.hbaseCluster.getMaster(serverNumber);
397 }
398
399
400
401
402
403 public String abortMaster(int serverNumber) {
404 HMaster server = getMaster(serverNumber);
405 LOG.info("Aborting " + server.toString());
406 server.abort("Aborting for tests", new Exception("Trace info"));
407 return server.toString();
408 }
409
410
411
412
413
414
415
416 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
417 return stopMaster(serverNumber, true);
418 }
419
420
421
422
423
424
425
426
427
428
429
430 public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
431 final boolean shutdownFS) {
432 JVMClusterUtil.MasterThread server =
433 hbaseCluster.getMasters().get(serverNumber);
434 LOG.info("Stopping " + server.toString());
435 server.getMaster().stop("Stopping master " + serverNumber);
436 return server;
437 }
438
439
440
441
442
443
444
445 public String waitOnMaster(final int serverNumber) {
446 return this.hbaseCluster.waitOnMaster(serverNumber);
447 }
448
449
450
451
452
453
454
455
456
457 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
458 List<JVMClusterUtil.MasterThread> mts;
459 long start = System.currentTimeMillis();
460 while (!(mts = getMasterThreads()).isEmpty()
461 && (System.currentTimeMillis() - start) < timeout) {
462 for (JVMClusterUtil.MasterThread mt : mts) {
463 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
464 return true;
465 }
466 }
467
468 Threads.sleep(100);
469 }
470 return false;
471 }
472
473
474
475
476 public List<JVMClusterUtil.MasterThread> getMasterThreads() {
477 return this.hbaseCluster.getMasters();
478 }
479
480
481
482
483 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
484 return this.hbaseCluster.getLiveMasters();
485 }
486
487
488
489
490 public void join() {
491 this.hbaseCluster.join();
492 }
493
494
495
496
497
498 public void shutdown() throws IOException {
499 if (this.hbaseCluster != null) {
500 this.hbaseCluster.shutdown();
501 }
502 HConnectionManager.deleteAllConnections(false);
503 }
504
505 @Override
506 public void close() throws IOException {
507 }
508
509 @Override
510 public ClusterStatus getClusterStatus() throws IOException {
511 HMaster master = getMaster();
512 return master == null ? null : master.getClusterStatus();
513 }
514
515
516
517
518
519 public void flushcache() throws IOException {
520 for (JVMClusterUtil.RegionServerThread t:
521 this.hbaseCluster.getRegionServers()) {
522 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
523 r.flushcache();
524 }
525 }
526 }
527
528
529
530
531
532 public void flushcache(TableName tableName) throws IOException {
533 for (JVMClusterUtil.RegionServerThread t:
534 this.hbaseCluster.getRegionServers()) {
535 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
536 if(r.getTableDesc().getTableName().equals(tableName)) {
537 r.flushcache();
538 }
539 }
540 }
541 }
542
543
544
545
546
547 public void compact(boolean major) throws IOException {
548 for (JVMClusterUtil.RegionServerThread t:
549 this.hbaseCluster.getRegionServers()) {
550 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
551 r.compactStores(major);
552 }
553 }
554 }
555
556
557
558
559
560 public void compact(TableName tableName, boolean major) throws IOException {
561 for (JVMClusterUtil.RegionServerThread t:
562 this.hbaseCluster.getRegionServers()) {
563 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
564 if(r.getTableDesc().getTableName().equals(tableName)) {
565 r.compactStores(major);
566 }
567 }
568 }
569 }
570
571
572
573
574 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
575 return this.hbaseCluster.getRegionServers();
576 }
577
578
579
580
581 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
582 return this.hbaseCluster.getLiveRegionServers();
583 }
584
585
586
587
588
589
590 public HRegionServer getRegionServer(int serverNumber) {
591 return hbaseCluster.getRegionServer(serverNumber);
592 }
593
594 public List<HRegion> getRegions(byte[] tableName) {
595 return getRegions(TableName.valueOf(tableName));
596 }
597
598 public List<HRegion> getRegions(TableName tableName) {
599 List<HRegion> ret = new ArrayList<HRegion>();
600 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
601 HRegionServer hrs = rst.getRegionServer();
602 for (HRegion region : hrs.getOnlineRegionsLocalContext()) {
603 if (region.getTableDesc().getTableName().equals(tableName)) {
604 ret.add(region);
605 }
606 }
607 }
608 return ret;
609 }
610
611
612
613
614
615 public int getServerWithMeta() {
616 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
617 }
618
619
620
621
622
623
624
625 public int getServerWith(byte[] regionName) {
626 int index = -1;
627 int count = 0;
628 for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
629 HRegionServer hrs = rst.getRegionServer();
630 HRegion metaRegion =
631 hrs.getOnlineRegion(regionName);
632 if (metaRegion != null) {
633 index = count;
634 break;
635 }
636 count++;
637 }
638 return index;
639 }
640
641 @Override
642 public ServerName getServerHoldingRegion(byte[] regionName) throws IOException {
643 int index = getServerWith(regionName);
644 if (index < 0) {
645 return null;
646 }
647 return getRegionServer(index).getServerName();
648 }
649
650
651
652
653
654
655
656 public long countServedRegions() {
657 long count = 0;
658 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
659 count += rst.getRegionServer().getNumberOfOnlineRegions();
660 }
661 return count;
662 }
663
664
665
666
667
668 public void killAll() {
669 for (RegionServerThread rst : getRegionServerThreads()) {
670 rst.getRegionServer().abort("killAll");
671 }
672 for (MasterThread masterThread : getMasterThreads()) {
673 masterThread.getMaster().abort("killAll", new Throwable());
674 }
675 }
676
677 @Override
678 public void waitUntilShutDown() {
679 this.hbaseCluster.join();
680 }
681
682 public List<HRegion> findRegionsForTable(TableName tableName) {
683 ArrayList<HRegion> ret = new ArrayList<HRegion>();
684 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
685 HRegionServer hrs = rst.getRegionServer();
686 for (HRegion region : hrs.getOnlineRegions(tableName)) {
687 if (region.getTableDesc().getTableName().equals(tableName)) {
688 ret.add(region);
689 }
690 }
691 }
692 return ret;
693 }
694
695
696 protected int getRegionServerIndex(ServerName serverName) {
697
698 List<RegionServerThread> servers = getRegionServerThreads();
699 for (int i=0; i < servers.size(); i++) {
700 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
701 return i;
702 }
703 }
704 return -1;
705 }
706
707 protected int getMasterIndex(ServerName serverName) {
708 List<MasterThread> masters = getMasterThreads();
709 for (int i = 0; i < masters.size(); i++) {
710 if (masters.get(i).getMaster().getServerName().equals(serverName)) {
711 return i;
712 }
713 }
714 return -1;
715 }
716
717 @Override
718 public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
719 return getRegionServer(getRegionServerIndex(serverName));
720 }
721
722 @Override
723 public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
724 throws IOException {
725 return getRegionServer(getRegionServerIndex(serverName));
726 }
727 }