View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.security.PrivilegedExceptionAction;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.client.HBaseAdmin;
33  import org.apache.hadoop.hbase.regionserver.HRegionServer;
34  import org.apache.hadoop.hbase.security.User;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
37  import org.apache.hadoop.hbase.util.Threads;
38  
39  import java.util.concurrent.CopyOnWriteArrayList;
40  import org.apache.hadoop.hbase.master.HMaster;
41  import org.apache.hadoop.hbase.util.JVMClusterUtil;
42  
43  /**
44   * This class creates a single process HBase cluster. One thread is created for
45   * a master and one per region server.
46   *
47   * Call {@link #startup()} to start the cluster running and {@link #shutdown()}
48   * to close it all down. {@link #join} the cluster is you want to wait on
49   * shutdown completion.
50   *
51   * <p>Runs master on port 60000 by default.  Because we can't just kill the
52   * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to
53   * be able to find the master with a remote client to run shutdown.  To use a
54   * port other than 60000, set the hbase.master to a value of 'local:PORT':
55   * that is 'local', not 'localhost', and the port number the master should use
56   * instead of 60000.
57   *
58   */
59  @InterfaceAudience.Public
60  @InterfaceStability.Evolving
61  public class LocalHBaseCluster {
62    static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
63    private final List<JVMClusterUtil.MasterThread> masterThreads =
64      new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
65    private final List<JVMClusterUtil.RegionServerThread> regionThreads =
66      new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
67    private final static int DEFAULT_NO = 1;
68    /** local mode */
69    public static final String LOCAL = "local";
70    /** 'local:' */
71    public static final String LOCAL_COLON = LOCAL + ":";
72    private final Configuration conf;
73    private final Class<? extends HMaster> masterClass;
74    private final Class<? extends HRegionServer> regionServerClass;
75  
76    /**
77     * Constructor.
78     * @param conf
79     * @throws IOException
80     */
81    public LocalHBaseCluster(final Configuration conf)
82    throws IOException {
83      this(conf, DEFAULT_NO);
84    }
85  
86    /**
87     * Constructor.
88     * @param conf Configuration to use.  Post construction has the master's
89     * address.
90     * @param noRegionServers Count of regionservers to start.
91     * @throws IOException
92     */
93    public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
94    throws IOException {
95      this(conf, 1, noRegionServers, getMasterImplementation(conf),
96          getRegionServerImplementation(conf));
97    }
98  
99    /**
100    * Constructor.
101    * @param conf Configuration to use.  Post construction has the active master
102    * address.
103    * @param noMasters Count of masters to start.
104    * @param noRegionServers Count of regionservers to start.
105    * @throws IOException
106    */
107   public LocalHBaseCluster(final Configuration conf, final int noMasters,
108       final int noRegionServers)
109   throws IOException {
110     this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
111         getRegionServerImplementation(conf));
112   }
113 
114   @SuppressWarnings("unchecked")
115   private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
116     return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
117        HRegionServer.class);
118   }
119 
120   @SuppressWarnings("unchecked")
121   private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
122     return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
123        HMaster.class);
124   }
125 
126   /**
127    * Constructor.
128    * @param conf Configuration to use.  Post construction has the master's
129    * address.
130    * @param noMasters Count of masters to start.
131    * @param noRegionServers Count of regionservers to start.
132    * @param masterClass
133    * @param regionServerClass
134    * @throws IOException
135    */
136   @SuppressWarnings("unchecked")
137   public LocalHBaseCluster(final Configuration conf, final int noMasters,
138     final int noRegionServers, final Class<? extends HMaster> masterClass,
139     final Class<? extends HRegionServer> regionServerClass)
140   throws IOException {
141     this.conf = conf;
142     // Always have masters and regionservers come up on port '0' so we don't
143     // clash over default ports.
144     conf.set(HConstants.MASTER_PORT, "0");
145     conf.set(HConstants.REGIONSERVER_PORT, "0");
146     conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
147 
148     this.masterClass = (Class<? extends HMaster>)
149       conf.getClass(HConstants.MASTER_IMPL, masterClass);
150     // Start the HMasters.
151     for (int i = 0; i < noMasters; i++) {
152       addMaster(new Configuration(conf), i);
153     }
154     // Start the HRegionServers.
155     this.regionServerClass =
156       (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
157        regionServerClass);
158 
159     for (int i = 0; i < noRegionServers; i++) {
160       addRegionServer(new Configuration(conf), i);
161     }
162   }
163 
164   public JVMClusterUtil.RegionServerThread addRegionServer()
165       throws IOException {
166     return addRegionServer(new Configuration(conf), this.regionThreads.size());
167   }
168 
169   @SuppressWarnings("unchecked")
170   public JVMClusterUtil.RegionServerThread addRegionServer(
171       Configuration config, final int index)
172   throws IOException {
173     // Create each regionserver with its own Configuration instance so each has
174     // its HConnection instance rather than share (see HBASE_INSTANCES down in
175     // the guts of HConnectionManager.
176     JVMClusterUtil.RegionServerThread rst =
177       JVMClusterUtil.createRegionServerThread(config,
178         (Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL,
179           this.regionServerClass), index);
180     this.regionThreads.add(rst);
181     return rst;
182   }
183 
184   public JVMClusterUtil.RegionServerThread addRegionServer(
185       final Configuration config, final int index, User user)
186   throws IOException, InterruptedException {
187     return user.runAs(
188         new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
189           public JVMClusterUtil.RegionServerThread run() throws Exception {
190             return addRegionServer(config, index);
191           }
192         });
193   }
194 
195   public JVMClusterUtil.MasterThread addMaster() throws IOException {
196     return addMaster(new Configuration(conf), this.masterThreads.size());
197   }
198 
199   public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
200   throws IOException {
201     // Create each master with its own Configuration instance so each has
202     // its HConnection instance rather than share (see HBASE_INSTANCES down in
203     // the guts of HConnectionManager.
204     JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
205         (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
206     this.masterThreads.add(mt);
207     return mt;
208   }
209 
210   public JVMClusterUtil.MasterThread addMaster(
211       final Configuration c, final int index, User user)
212   throws IOException, InterruptedException {
213     return user.runAs(
214         new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
215           public JVMClusterUtil.MasterThread run() throws Exception {
216             return addMaster(c, index);
217           }
218         });
219   }
220 
221   /**
222    * @param serverNumber
223    * @return region server
224    */
225   public HRegionServer getRegionServer(int serverNumber) {
226     return regionThreads.get(serverNumber).getRegionServer();
227   }
228 
229   /**
230    * @return Read-only list of region server threads.
231    */
232   public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
233     return Collections.unmodifiableList(this.regionThreads);
234   }
235 
236   /**
237    * @return List of running servers (Some servers may have been killed or
238    * aborted during lifetime of cluster; these servers are not included in this
239    * list).
240    */
241   public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
242     List<JVMClusterUtil.RegionServerThread> liveServers =
243       new ArrayList<JVMClusterUtil.RegionServerThread>();
244     List<RegionServerThread> list = getRegionServers();
245     for (JVMClusterUtil.RegionServerThread rst: list) {
246       if (rst.isAlive()) liveServers.add(rst);
247       else LOG.info("Not alive " + rst.getName());
248     }
249     return liveServers;
250   }
251 
252   /**
253    * @return the Configuration used by this LocalHBaseCluster
254    */
255   public Configuration getConfiguration() {
256     return this.conf;
257   }
258 
259   /**
260    * Wait for the specified region server to stop
261    * Removes this thread from list of running threads.
262    * @param serverNumber
263    * @return Name of region server that just went down.
264    */
265   public String waitOnRegionServer(int serverNumber) {
266     JVMClusterUtil.RegionServerThread regionServerThread =
267       this.regionThreads.remove(serverNumber);
268     while (regionServerThread.isAlive()) {
269       try {
270         LOG.info("Waiting on " +
271           regionServerThread.getRegionServer().toString());
272         regionServerThread.join();
273       } catch (InterruptedException e) {
274         e.printStackTrace();
275       }
276     }
277     return regionServerThread.getName();
278   }
279 
280   /**
281    * Wait for the specified region server to stop
282    * Removes this thread from list of running threads.
283    * @param rst
284    * @return Name of region server that just went down.
285    */
286   public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
287     while (rst.isAlive()) {
288       try {
289         LOG.info("Waiting on " +
290           rst.getRegionServer().toString());
291         rst.join();
292       } catch (InterruptedException e) {
293         e.printStackTrace();
294       }
295     }
296     for (int i=0;i<regionThreads.size();i++) {
297       if (regionThreads.get(i) == rst) {
298         regionThreads.remove(i);
299         break;
300       }
301     }
302     return rst.getName();
303   }
304 
305   /**
306    * @param serverNumber
307    * @return the HMaster thread
308    */
309   public HMaster getMaster(int serverNumber) {
310     return masterThreads.get(serverNumber).getMaster();
311   }
312 
313   /**
314    * Gets the current active master, if available.  If no active master, returns
315    * null.
316    * @return the HMaster for the active master
317    */
318   public HMaster getActiveMaster() {
319     for (JVMClusterUtil.MasterThread mt : masterThreads) {
320       if (mt.getMaster().isActiveMaster()) {
321         // Ensure that the current active master is not stopped.
322         // We don't want to return a stopping master as an active master.
323         if (mt.getMaster().isActiveMaster()  && !mt.getMaster().isStopped()) {
324           return mt.getMaster();
325         }
326       }
327     }
328     return null;
329   }
330 
331   /**
332    * @return Read-only list of master threads.
333    */
334   public List<JVMClusterUtil.MasterThread> getMasters() {
335     return Collections.unmodifiableList(this.masterThreads);
336   }
337 
338   /**
339    * @return List of running master servers (Some servers may have been killed
340    * or aborted during lifetime of cluster; these servers are not included in
341    * this list).
342    */
343   public List<JVMClusterUtil.MasterThread> getLiveMasters() {
344     List<JVMClusterUtil.MasterThread> liveServers =
345       new ArrayList<JVMClusterUtil.MasterThread>();
346     List<JVMClusterUtil.MasterThread> list = getMasters();
347     for (JVMClusterUtil.MasterThread mt: list) {
348       if (mt.isAlive()) {
349         liveServers.add(mt);
350       }
351     }
352     return liveServers;
353   }
354 
355   /**
356    * Wait for the specified master to stop
357    * Removes this thread from list of running threads.
358    * @param serverNumber
359    * @return Name of master that just went down.
360    */
361   public String waitOnMaster(int serverNumber) {
362     JVMClusterUtil.MasterThread masterThread =
363       this.masterThreads.remove(serverNumber);
364     while (masterThread.isAlive()) {
365       try {
366         LOG.info("Waiting on " +
367           masterThread.getMaster().getServerName().toString());
368         masterThread.join();
369       } catch (InterruptedException e) {
370         e.printStackTrace();
371       }
372     }
373     return masterThread.getName();
374   }
375 
376   /**
377    * Wait for the specified master to stop
378    * Removes this thread from list of running threads.
379    * @param masterThread
380    * @return Name of master that just went down.
381    */
382   public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
383     while (masterThread.isAlive()) {
384       try {
385         LOG.info("Waiting on " +
386           masterThread.getMaster().getServerName().toString());
387         masterThread.join();
388       } catch (InterruptedException e) {
389         e.printStackTrace();
390       }
391     }
392     for (int i=0;i<masterThreads.size();i++) {
393       if (masterThreads.get(i) == masterThread) {
394         masterThreads.remove(i);
395         break;
396       }
397     }
398     return masterThread.getName();
399   }
400 
401   /**
402    * Wait for Mini HBase Cluster to shut down.
403    * Presumes you've already called {@link #shutdown()}.
404    */
405   public void join() {
406     if (this.regionThreads != null) {
407       for(Thread t: this.regionThreads) {
408         if (t.isAlive()) {
409           try {
410             Threads.threadDumpingIsAlive(t);
411           } catch (InterruptedException e) {
412             LOG.debug("Interrupted", e);
413           }
414         }
415       }
416     }
417     if (this.masterThreads != null) {
418       for (Thread t : this.masterThreads) {
419         if (t.isAlive()) {
420           try {
421             Threads.threadDumpingIsAlive(t);
422           } catch (InterruptedException e) {
423             LOG.debug("Interrupted", e);
424           }
425         }
426       }
427     }
428   }
429 
430   /**
431    * Start the cluster.
432    */
433   public void startup() throws IOException {
434     JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
435   }
436 
437   /**
438    * Shut down the mini HBase cluster
439    */
440   public void shutdown() {
441     JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
442   }
443 
444   /**
445    * @param c Configuration to check.
446    * @return True if a 'local' address in hbase.master value.
447    */
448   public static boolean isLocal(final Configuration c) {
449     boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
450     return(mode == HConstants.CLUSTER_IS_LOCAL);
451   }
452 
453   /**
454    * Test things basically work.
455    * @param args
456    * @throws IOException
457    */
458   public static void main(String[] args) throws IOException {
459     Configuration conf = HBaseConfiguration.create();
460     LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
461     cluster.startup();
462     HBaseAdmin admin = new HBaseAdmin(conf);
463     HTableDescriptor htd =
464       new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
465     admin.createTable(htd);
466     cluster.shutdown();
467   }
468 }