View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase;
19  
20  import static org.junit.Assert.assertTrue;
21  import static org.junit.Assert.fail;
22  
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.OutputStream;
26  import java.lang.reflect.Field;
27  import java.lang.reflect.Method;
28  import java.lang.reflect.Modifier;
29  import java.net.InetAddress;
30  import java.net.InetSocketAddress;
31  import java.net.ServerSocket;
32  import java.net.Socket;
33  import java.net.UnknownHostException;
34  import java.security.MessageDigest;
35  import java.util.ArrayList;
36  import java.util.Arrays;
37  import java.util.Collection;
38  import java.util.Collections;
39  import java.util.HashSet;
40  import java.util.List;
41  import java.util.Map;
42  import java.util.NavigableSet;
43  import java.util.Random;
44  import java.util.Set;
45  import java.util.UUID;
46  import java.util.concurrent.TimeUnit;
47  
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.commons.logging.impl.Jdk14Logger;
51  import org.apache.commons.logging.impl.Log4JLogger;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.classification.InterfaceStability;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.fs.FileSystem;
56  import org.apache.hadoop.fs.Path;
57  import org.apache.hadoop.hbase.Waiter.Predicate;
58  import org.apache.hadoop.hbase.catalog.MetaEditor;
59  import org.apache.hadoop.hbase.client.Delete;
60  import org.apache.hadoop.hbase.client.Durability;
61  import org.apache.hadoop.hbase.client.Get;
62  import org.apache.hadoop.hbase.client.HBaseAdmin;
63  import org.apache.hadoop.hbase.client.HConnection;
64  import org.apache.hadoop.hbase.client.HTable;
65  import org.apache.hadoop.hbase.client.Put;
66  import org.apache.hadoop.hbase.client.Result;
67  import org.apache.hadoop.hbase.client.ResultScanner;
68  import org.apache.hadoop.hbase.client.Scan;
69  import org.apache.hadoop.hbase.fs.HFileSystem;
70  import org.apache.hadoop.hbase.io.compress.Compression;
71  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
72  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
73  import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
74  import org.apache.hadoop.hbase.io.hfile.HFile;
75  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
76  import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
77  import org.apache.hadoop.hbase.master.HMaster;
78  import org.apache.hadoop.hbase.master.RegionStates;
79  import org.apache.hadoop.hbase.master.ServerManager;
80  import org.apache.hadoop.hbase.regionserver.BloomType;
81  import org.apache.hadoop.hbase.regionserver.HRegion;
82  import org.apache.hadoop.hbase.regionserver.HRegionServer;
83  import org.apache.hadoop.hbase.regionserver.HStore;
84  import org.apache.hadoop.hbase.regionserver.InternalScanner;
85  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
86  import org.apache.hadoop.hbase.regionserver.wal.HLog;
87  import org.apache.hadoop.hbase.security.User;
88  import org.apache.hadoop.hbase.tool.Canary;
89  import org.apache.hadoop.hbase.util.Bytes;
90  import org.apache.hadoop.hbase.util.FSTableDescriptors;
91  import org.apache.hadoop.hbase.util.FSUtils;
92  import org.apache.hadoop.hbase.util.JVMClusterUtil;
93  import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
94  import org.apache.hadoop.hbase.util.RegionSplitter;
95  import org.apache.hadoop.hbase.util.RetryCounter;
96  import org.apache.hadoop.hbase.util.Threads;
97  import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
98  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
99  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
100 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
101 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
102 import org.apache.hadoop.hdfs.DFSClient;
103 import org.apache.hadoop.hdfs.DistributedFileSystem;
104 import org.apache.hadoop.hdfs.MiniDFSCluster;
105 import org.apache.hadoop.mapred.JobConf;
106 import org.apache.hadoop.mapred.MiniMRCluster;
107 import org.apache.hadoop.mapred.TaskLog;
108 import org.apache.hadoop.security.UserGroupInformation;
109 import org.apache.zookeeper.KeeperException;
110 import org.apache.zookeeper.KeeperException.NodeExistsException;
111 import org.apache.zookeeper.WatchedEvent;
112 import org.apache.zookeeper.ZooKeeper;
113 import org.apache.zookeeper.ZooKeeper.States;
114 
115 /**
116  * Facility for testing HBase. Replacement for
117  * old HBaseTestCase and HBaseClusterTestCase functionality.
118  * Create an instance and keep it around testing HBase.  This class is
119  * meant to be your one-stop shop for anything you might need testing.  Manages
120  * one cluster at a time only. Managed cluster can be an in-process
121  * {@link MiniHBaseCluster}, or a deployed cluster of type {@link DistributedHBaseCluster}.
122  * Not all methods work with the real cluster.
123  * Depends on log4j being on classpath and
124  * hbase-site.xml for logging and test-run configuration.  It does not set
125  * logging levels nor make changes to configuration parameters.
126  * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
127  * setting it to true.
128  */
129 @InterfaceAudience.Public
130 @InterfaceStability.Evolving
131 public class HBaseTestingUtility extends HBaseCommonTestingUtility {
132    private MiniZooKeeperCluster zkCluster = null;
133 
134   public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
135   /**
136    * The default number of regions per regionserver when creating a pre-split
137    * table.
138    */
139   public static final int DEFAULT_REGIONS_PER_SERVER = 5;
140 
141   /**
142    * Set if we were passed a zkCluster.  If so, we won't shutdown zk as
143    * part of general shutdown.
144    */
145   private boolean passedZkCluster = false;
146   private MiniDFSCluster dfsCluster = null;
147 
148   private HBaseCluster hbaseCluster = null;
149   private MiniMRCluster mrCluster = null;
150 
151   /** If there is a mini cluster running for this testing utility instance. */
152   private boolean miniClusterRunning;
153 
154   private String hadoopLogDir;
155 
156   /** Directory (a subdirectory of dataTestDir) used by the dfs cluster if any */
157   private File clusterTestDir = null;
158 
159   /** Directory on test filesystem where we put the data for this instance of
160     * HBaseTestingUtility*/
161   private Path dataTestDirOnTestFS = null;
162 
163   /**
164    * System property key to get test directory value.
165    * Name is as it is because mini dfs has hard-codings to put test data here.
166    * It should NOT be used directly in HBase, as it's a property used in
167    *  mini dfs.
168    *  @deprecated can be used only with mini dfs
169    */
170   @Deprecated
171   private static final String TEST_DIRECTORY_KEY = "test.build.data";
172 
173   /** Filesystem URI used for map-reduce mini-cluster setup */
174   private static String FS_URI;
175 
176   /** A set of ports that have been claimed using {@link #randomFreePort()}. */
177   private static final Set<Integer> takenRandomPorts = new HashSet<Integer>();
178 
179   /** Compression algorithms to use in parameterized JUnit 4 tests */
180   public static final List<Object[]> COMPRESSION_ALGORITHMS_PARAMETERIZED =
181     Arrays.asList(new Object[][] {
182       { Compression.Algorithm.NONE },
183       { Compression.Algorithm.GZ }
184     });
185 
186   /** This is for unit tests parameterized with a two booleans. */
187   public static final List<Object[]> BOOLEAN_PARAMETERIZED =
188       Arrays.asList(new Object[][] {
189           { new Boolean(false) },
190           { new Boolean(true) }
191       });
192 
193   /** This is for unit tests parameterized with a single boolean. */
194   public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination()  ;
195   /** Compression algorithms to use in testing */
196   public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS ={
197       Compression.Algorithm.NONE, Compression.Algorithm.GZ
198     };
199 
200   /**
201    * Create all combinations of Bloom filters and compression algorithms for
202    * testing.
203    */
204   private static List<Object[]> bloomAndCompressionCombinations() {
205     List<Object[]> configurations = new ArrayList<Object[]>();
206     for (Compression.Algorithm comprAlgo :
207          HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
208       for (BloomType bloomType : BloomType.values()) {
209         configurations.add(new Object[] { comprAlgo, bloomType });
210       }
211     }
212     return Collections.unmodifiableList(configurations);
213   }
214 
215   /**
216    * Create combination of memstoreTS and tags
217    */
218   private static List<Object[]> memStoreTSAndTagsCombination() {
219     List<Object[]> configurations = new ArrayList<Object[]>();
220     configurations.add(new Object[] { false, false });
221     configurations.add(new Object[] { false, true });
222     configurations.add(new Object[] { true, false });
223     configurations.add(new Object[] { true, true });
224     return Collections.unmodifiableList(configurations);
225   }
226 
227   public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
228       bloomAndCompressionCombinations();
229 
230   public HBaseTestingUtility() {
231     this(HBaseConfiguration.create());
232   }
233 
234   public HBaseTestingUtility(Configuration conf) {
235     super(conf);
236 
237     // a hbase checksum verification failure will cause unit tests to fail
238     ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
239   }
240 
241   /**
242    * Create an HBaseTestingUtility where all tmp files are written to the local test data dir.
243    * It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper
244    * test dir.  Use this when you aren't using an Mini HDFS cluster.
245    * @return HBaseTestingUtility that use local fs for temp files.
246    */
247   public static HBaseTestingUtility createLocalHTU() {
248     Configuration c = HBaseConfiguration.create();
249     return createLocalHTU(c);
250   }
251 
252   /**
253    * Create an HBaseTestingUtility where all tmp files are written to the local test data dir.
254    * It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper
255    * test dir.  Use this when you aren't using an Mini HDFS cluster.
256    * @param c Configuration (will be modified)
257    * @return HBaseTestingUtility that use local fs for temp files.
258    */
259   public static HBaseTestingUtility createLocalHTU(Configuration c) {
260     HBaseTestingUtility htu = new HBaseTestingUtility(c);
261     String dataTestDir = htu.getDataTestDir().toString();
262     htu.getConfiguration().set(HConstants.HBASE_DIR, dataTestDir);
263     LOG.debug("Setting " + HConstants.HBASE_DIR + " to " + dataTestDir);
264     return htu;
265   }
266 
267   /**
268    * Controls how many attempts we will make in the face of failures in HDFS.
269    * @deprecated to be removed with Hadoop 1.x support
270    */
271   @Deprecated
272   public void setHDFSClientRetry(final int retries) {
273     this.conf.setInt("hdfs.client.retries.number", retries);
274     if (0 == retries) {
275       makeDFSClientNonRetrying();
276     }
277   }
278 
279   /**
280    * Returns this classes's instance of {@link Configuration}.  Be careful how
281    * you use the returned Configuration since {@link HConnection} instances
282    * can be shared.  The Map of HConnections is keyed by the Configuration.  If
283    * say, a Connection was being used against a cluster that had been shutdown,
284    * see {@link #shutdownMiniCluster()}, then the Connection will no longer
285    * be wholesome.  Rather than use the return direct, its usually best to
286    * make a copy and use that.  Do
287    * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
288    * @return Instance of Configuration.
289    */
290   @Override
291   public Configuration getConfiguration() {
292     return super.getConfiguration();
293   }
294 
295   public void setHBaseCluster(HBaseCluster hbaseCluster) {
296     this.hbaseCluster = hbaseCluster;
297   }
298 
299   /**
300    * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
301    * Give it a random name so can have many concurrent tests running if
302    * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
303    * System property, as it's what minidfscluster bases
304    * it data dir on.  Moding a System property is not the way to do concurrent
305    * instances -- another instance could grab the temporary
306    * value unintentionally -- but not anything can do about it at moment;
307    * single instance only is how the minidfscluster works.
308    *
309    * We also create the underlying directory for
310    *  hadoop.log.dir, mapred.local.dir and hadoop.tmp.dir, and set the values
311    *  in the conf, and as a system property for hadoop.tmp.dir
312    *
313    * @return The calculated data test build directory, if newly-created.
314    */
315   @Override
316   protected Path setupDataTestDir() {
317     Path testPath = super.setupDataTestDir();
318     if (null == testPath) {
319       return null;
320     }
321 
322     createSubDirAndSystemProperty(
323       "hadoop.log.dir",
324       testPath, "hadoop-log-dir");
325 
326     // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
327     //  we want our own value to ensure uniqueness on the same machine
328     createSubDirAndSystemProperty(
329       "hadoop.tmp.dir",
330       testPath, "hadoop-tmp-dir");
331 
332     // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
333     createSubDir(
334       "mapred.local.dir",
335       testPath, "mapred-local-dir");
336 
337     return testPath;
338   }
339 
340   private void createSubDirAndSystemProperty(
341     String propertyName, Path parent, String subDirName){
342 
343     String sysValue = System.getProperty(propertyName);
344 
345     if (sysValue != null) {
346       // There is already a value set. So we do nothing but hope
347       //  that there will be no conflicts
348       LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
349         sysValue + " so I do NOT create it in " + parent);
350       String confValue = conf.get(propertyName);
351       if (confValue != null && !confValue.endsWith(sysValue)){
352        LOG.warn(
353          propertyName + " property value differs in configuration and system: "+
354          "Configuration="+confValue+" while System="+sysValue+
355          " Erasing configuration value by system value."
356        );
357       }
358       conf.set(propertyName, sysValue);
359     } else {
360       // Ok, it's not set, so we create it as a subdirectory
361       createSubDir(propertyName, parent, subDirName);
362       System.setProperty(propertyName, conf.get(propertyName));
363     }
364   }
365 
366   /**
367    * @return Where to write test data on the test filesystem; Returns working directory
368    * for the test filesystem by default
369    * @see #setupDataTestDirOnTestFS()
370    * @see #getTestFileSystem()
371    */
372   private Path getBaseTestDirOnTestFS() throws IOException {
373     FileSystem fs = getTestFileSystem();
374     return new Path(fs.getWorkingDirectory(), "test-data");
375   }
376 
377   /**
378    * @return META table descriptor
379    */
380   public HTableDescriptor getMetaTableDescriptor() {
381     try {
382       return new FSTableDescriptors(conf).get(TableName.META_TABLE_NAME);
383     } catch (IOException e) {
384       throw new RuntimeException("Unable to create META table descriptor", e);
385     }
386   }
387 
388   /**
389    * @return Where the DFS cluster will write data on the local subsystem.
390    * Creates it if it does not exist already.  A subdir of {@link #getBaseTestDir()}
391    * @see #getTestFileSystem()
392    */
393   Path getClusterTestDir() {
394     if (clusterTestDir == null){
395       setupClusterTestDir();
396     }
397     return new Path(clusterTestDir.getAbsolutePath());
398   }
399 
400   /**
401    * Creates a directory for the DFS cluster, under the test data
402    */
403   private void setupClusterTestDir() {
404     if (clusterTestDir != null) {
405       return;
406     }
407 
408     // Using randomUUID ensures that multiple clusters can be launched by
409     //  a same test, if it stops & starts them
410     Path testDir = getDataTestDir("dfscluster_" + UUID.randomUUID().toString());
411     clusterTestDir = new File(testDir.toString()).getAbsoluteFile();
412     // Have it cleaned up on exit
413     boolean b = deleteOnExit();
414     if (b) clusterTestDir.deleteOnExit();
415     conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
416     LOG.info("Created new mini-cluster data directory: " + clusterTestDir + ", deleteOnExit=" + b);
417   }
418 
419   /**
420    * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
421    * to write temporary test data. Call this method after setting up the mini dfs cluster
422    * if the test relies on it.
423    * @return a unique path in the test filesystem
424    */
425   public Path getDataTestDirOnTestFS() throws IOException {
426     if (dataTestDirOnTestFS == null) {
427       setupDataTestDirOnTestFS();
428     }
429 
430     return dataTestDirOnTestFS;
431   }
432 
433   /**
434    * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
435    * to write temporary test data. Call this method after setting up the mini dfs cluster
436    * if the test relies on it.
437    * @return a unique path in the test filesystem
438    * @param subdirName name of the subdir to create under the base test dir
439    */
440   public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
441     return new Path(getDataTestDirOnTestFS(), subdirName);
442   }
443 
444   /**
445    * Sets up a path in test filesystem to be used by tests
446    */
447   private void setupDataTestDirOnTestFS() throws IOException {
448     if (dataTestDirOnTestFS != null) {
449       LOG.warn("Data test on test fs dir already setup in "
450           + dataTestDirOnTestFS.toString());
451       return;
452     }
453 
454     //The file system can be either local, mini dfs, or if the configuration
455     //is supplied externally, it can be an external cluster FS. If it is a local
456     //file system, the tests should use getBaseTestDir, otherwise, we can use
457     //the working directory, and create a unique sub dir there
458     FileSystem fs = getTestFileSystem();
459     if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
460       File dataTestDir = new File(getDataTestDir().toString());
461       if (deleteOnExit()) dataTestDir.deleteOnExit();
462       dataTestDirOnTestFS = new Path(dataTestDir.getAbsolutePath());
463     } else {
464       Path base = getBaseTestDirOnTestFS();
465       String randomStr = UUID.randomUUID().toString();
466       dataTestDirOnTestFS = new Path(base, randomStr);
467       if (deleteOnExit()) fs.deleteOnExit(dataTestDirOnTestFS);
468     }
469   }
470 
471   /**
472    * Cleans the test data directory on the test filesystem.
473    * @return True if we removed the test dirs
474    * @throws IOException
475    */
476   public boolean cleanupDataTestDirOnTestFS() throws IOException {
477     boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
478     if (ret)
479       dataTestDirOnTestFS = null;
480     return ret;
481   }
482 
483   /**
484    * Cleans a subdirectory under the test data directory on the test filesystem.
485    * @return True if we removed child
486    * @throws IOException
487    */
488   public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
489     Path cpath = getDataTestDirOnTestFS(subdirName);
490     return getTestFileSystem().delete(cpath, true);
491   }
492 
493   /**
494    * Start a minidfscluster.
495    * @param servers How many DNs to start.
496    * @throws Exception
497    * @see {@link #shutdownMiniDFSCluster()}
498    * @return The mini dfs cluster created.
499    */
500   public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
501     return startMiniDFSCluster(servers, null);
502   }
503 
504   /**
505    * Start a minidfscluster.
506    * This is useful if you want to run datanode on distinct hosts for things
507    * like HDFS block location verification.
508    * If you start MiniDFSCluster without host names, all instances of the
509    * datanodes will have the same host name.
510    * @param hosts hostnames DNs to run on.
511    * @throws Exception
512    * @see {@link #shutdownMiniDFSCluster()}
513    * @return The mini dfs cluster created.
514    */
515   public MiniDFSCluster startMiniDFSCluster(final String hosts[])
516   throws Exception {
517     if ( hosts != null && hosts.length != 0) {
518       return startMiniDFSCluster(hosts.length, hosts);
519     } else {
520       return startMiniDFSCluster(1, null);
521     }
522   }
523 
524   /**
525    * Start a minidfscluster.
526    * Can only create one.
527    * @param servers How many DNs to start.
528    * @param hosts hostnames DNs to run on.
529    * @throws Exception
530    * @see {@link #shutdownMiniDFSCluster()}
531    * @return The mini dfs cluster created.
532    */
533   public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
534   throws Exception {
535     createDirsAndSetProperties();
536     try {
537       Method m = Class.forName("org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream")
538           .getMethod("setShouldSkipFsyncForTesting", new Class<?> []{ boolean.class });
539       m.invoke(null, new Object[] {true});
540     } catch (ClassNotFoundException e) {
541       LOG.info("EditLogFileOutputStream not found");
542     }
543 
544     // Error level to skip some warnings specific to the minicluster. See HBASE-4709
545     org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class).
546         setLevel(org.apache.log4j.Level.ERROR);
547     org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class).
548         setLevel(org.apache.log4j.Level.ERROR);
549 
550 
551     this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
552       true, null, null, hosts, null);
553 
554     // Set this just-started cluster as our filesystem.
555     FileSystem fs = this.dfsCluster.getFileSystem();
556     FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
557 
558     // Wait for the cluster to be totally up
559     this.dfsCluster.waitClusterUp();
560 
561     //reset the test directory for test file system
562     dataTestDirOnTestFS = null;
563 
564     return this.dfsCluster;
565   }
566 
567 
568   public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
569       throws Exception {
570     createDirsAndSetProperties();
571     this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
572         true, null, racks, hosts, null);
573 
574     // Set this just-started cluster as our filesystem.
575     FileSystem fs = this.dfsCluster.getFileSystem();
576     FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
577 
578     // Wait for the cluster to be totally up
579     this.dfsCluster.waitClusterUp();
580 
581     //reset the test directory for test file system
582     dataTestDirOnTestFS = null;
583 
584     return this.dfsCluster;
585   }
586 
587   public MiniDFSCluster startMiniDFSClusterForTestHLog(int namenodePort) throws IOException {
588     createDirsAndSetProperties();
589     dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
590         null, null, null);
591     return dfsCluster;
592   }
593 
594   /** This is used before starting HDFS and map-reduce mini-clusters */
595   private void createDirsAndSetProperties() throws IOException {
596     setupClusterTestDir();
597     System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
598     createDirAndSetProperty("cache_data", "test.cache.data");
599     createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir");
600     hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir");
601     createDirAndSetProperty("mapred_local", "mapred.local.dir");
602     createDirAndSetProperty("mapred_temp", "mapred.temp.dir");
603     enableShortCircuit();
604 
605     Path root = getDataTestDirOnTestFS("hadoop");
606     conf.set(MapreduceTestingShim.getMROutputDirProp(),
607       new Path(root, "mapred-output-dir").toString());
608     conf.set("mapred.system.dir", new Path(root, "mapred-system-dir").toString());
609     conf.set("mapreduce.jobtracker.staging.root.dir",
610       new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
611     conf.set("mapred.working.dir", new Path(root, "mapred-working-dir").toString());
612   }
613 
614 
615   /**
616    *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
617    *  This allows to specify this parameter on the command line.
618    *   If not set, default is true.
619    */
620   public boolean isReadShortCircuitOn(){
621     final String propName = "hbase.tests.use.shortcircuit.reads";
622     String readOnProp = System.getProperty(propName);
623     if (readOnProp != null){
624       return  Boolean.parseBoolean(readOnProp);
625     } else {
626       return conf.getBoolean(propName, false);
627     }
628   }
629 
630   /** Enable the short circuit read, unless configured differently.
631    * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
632    */
633   private void enableShortCircuit() {
634     if (isReadShortCircuitOn()) {
635       String curUser = System.getProperty("user.name");
636       LOG.info("read short circuit is ON for user " + curUser);
637       // read short circuit, for hdfs
638       conf.set("dfs.block.local-path-access.user", curUser);
639       // read short circuit, for hbase
640       conf.setBoolean("dfs.client.read.shortcircuit", true);
641       // Skip checking checksum, for the hdfs client and the datanode
642       conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
643     } else {
644       LOG.info("read short circuit is OFF");
645     }
646   }
647 
648   private String createDirAndSetProperty(final String relPath, String property) {
649     String path = getDataTestDir(relPath).toString();
650     System.setProperty(property, path);
651     conf.set(property, path);
652     new File(path).mkdirs();
653     LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
654     return path;
655   }
656 
657   /**
658    * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
659    * or does nothing.
660    * @throws IOException
661    */
662   public void shutdownMiniDFSCluster() throws IOException {
663     if (this.dfsCluster != null) {
664       // The below throws an exception per dn, AsynchronousCloseException.
665       this.dfsCluster.shutdown();
666       dfsCluster = null;
667       dataTestDirOnTestFS = null;
668       FSUtils.setFsDefault(this.conf, new Path("file:///"));
669     }
670   }
671 
672   /**
673    * Call this if you only want a zk cluster.
674    * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
675    * @throws Exception
676    * @see #shutdownMiniZKCluster()
677    * @return zk cluster started.
678    */
679   public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
680     return startMiniZKCluster(1);
681   }
682 
683   /**
684    * Call this if you only want a zk cluster.
685    * @param zooKeeperServerNum
686    * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
687    * @throws Exception
688    * @see #shutdownMiniZKCluster()
689    * @return zk cluster started.
690    */
691   public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum)
692       throws Exception {
693     setupClusterTestDir();
694     return startMiniZKCluster(clusterTestDir, zooKeeperServerNum);
695   }
696 
697   private MiniZooKeeperCluster startMiniZKCluster(final File dir)
698     throws Exception {
699     return startMiniZKCluster(dir,1);
700   }
701 
702   /**
703    * Start a mini ZK cluster. If the property "test.hbase.zookeeper.property.clientPort" is set
704    *  the port mentionned is used as the default port for ZooKeeper.
705    */
706   private MiniZooKeeperCluster startMiniZKCluster(final File dir,
707       int zooKeeperServerNum)
708   throws Exception {
709     if (this.zkCluster != null) {
710       throw new IOException("Cluster already running at " + dir);
711     }
712     this.passedZkCluster = false;
713     this.zkCluster = new MiniZooKeeperCluster(this.getConfiguration());
714     final int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0);
715     if (defPort > 0){
716       // If there is a port in the config file, we use it.
717       this.zkCluster.setDefaultClientPort(defPort);
718     }
719     int clientPort =   this.zkCluster.startup(dir,zooKeeperServerNum);
720     this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,
721       Integer.toString(clientPort));
722     return this.zkCluster;
723   }
724 
725   /**
726    * Shuts down zk cluster created by call to {@link #startMiniZKCluster(File)}
727    * or does nothing.
728    * @throws IOException
729    * @see #startMiniZKCluster()
730    */
731   public void shutdownMiniZKCluster() throws IOException {
732     if (this.zkCluster != null) {
733       this.zkCluster.shutdown();
734       this.zkCluster = null;
735     }
736   }
737 
738   /**
739    * Start up a minicluster of hbase, dfs, and zookeeper.
740    * @throws Exception
741    * @return Mini hbase cluster instance created.
742    * @see {@link #shutdownMiniDFSCluster()}
743    */
744   public MiniHBaseCluster startMiniCluster() throws Exception {
745     return startMiniCluster(1, 1);
746   }
747 
748   /**
749    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
750    * Modifies Configuration.  Homes the cluster data directory under a random
751    * subdirectory in a directory under System property test.build.data.
752    * Directory is cleaned up on exit.
753    * @param numSlaves Number of slaves to start up.  We'll start this many
754    * datanodes and regionservers.  If numSlaves is > 1, then make sure
755    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
756    * bind errors.
757    * @throws Exception
758    * @see {@link #shutdownMiniCluster()}
759    * @return Mini hbase cluster instance created.
760    */
761   public MiniHBaseCluster startMiniCluster(final int numSlaves)
762   throws Exception {
763     return startMiniCluster(1, numSlaves);
764   }
765 
766 
767   /**
768    * start minicluster
769    * @throws Exception
770    * @see {@link #shutdownMiniCluster()}
771    * @return Mini hbase cluster instance created.
772    */
773   public MiniHBaseCluster startMiniCluster(final int numMasters,
774     final int numSlaves)
775   throws Exception {
776     return startMiniCluster(numMasters, numSlaves, null);
777   }
778 
779   /**
780    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
781    * Modifies Configuration.  Homes the cluster data directory under a random
782    * subdirectory in a directory under System property test.build.data.
783    * Directory is cleaned up on exit.
784    * @param numMasters Number of masters to start up.  We'll start this many
785    * hbase masters.  If numMasters > 1, you can find the active/primary master
786    * with {@link MiniHBaseCluster#getMaster()}.
787    * @param numSlaves Number of slaves to start up.  We'll start this many
788    * regionservers. If dataNodeHosts == null, this also indicates the number of
789    * datanodes to start. If dataNodeHosts != null, the number of datanodes is
790    * based on dataNodeHosts.length.
791    * If numSlaves is > 1, then make sure
792    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
793    * bind errors.
794    * @param dataNodeHosts hostnames DNs to run on.
795    * This is useful if you want to run datanode on distinct hosts for things
796    * like HDFS block location verification.
797    * If you start MiniDFSCluster without host names,
798    * all instances of the datanodes will have the same host name.
799    * @throws Exception
800    * @see {@link #shutdownMiniCluster()}
801    * @return Mini hbase cluster instance created.
802    */
803   public MiniHBaseCluster startMiniCluster(final int numMasters,
804       final int numSlaves, final String[] dataNodeHosts) throws Exception {
805     return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts, null, null);
806   }
807 
808   /**
809    * Same as {@link #startMiniCluster(int, int)}, but with custom number of datanodes.
810    * @param numDataNodes Number of data nodes.
811    */
812   public MiniHBaseCluster startMiniCluster(final int numMasters,
813       final int numSlaves, final int numDataNodes) throws Exception {
814     return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null);
815   }
816 
817   /**
818    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
819    * Modifies Configuration.  Homes the cluster data directory under a random
820    * subdirectory in a directory under System property test.build.data.
821    * Directory is cleaned up on exit.
822    * @param numMasters Number of masters to start up.  We'll start this many
823    * hbase masters.  If numMasters > 1, you can find the active/primary master
824    * with {@link MiniHBaseCluster#getMaster()}.
825    * @param numSlaves Number of slaves to start up.  We'll start this many
826    * regionservers. If dataNodeHosts == null, this also indicates the number of
827    * datanodes to start. If dataNodeHosts != null, the number of datanodes is
828    * based on dataNodeHosts.length.
829    * If numSlaves is > 1, then make sure
830    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
831    * bind errors.
832    * @param dataNodeHosts hostnames DNs to run on.
833    * This is useful if you want to run datanode on distinct hosts for things
834    * like HDFS block location verification.
835    * If you start MiniDFSCluster without host names,
836    * all instances of the datanodes will have the same host name.
837    * @param masterClass The class to use as HMaster, or null for default
838    * @param regionserverClass The class to use as HRegionServer, or null for
839    * default
840    * @throws Exception
841    * @see {@link #shutdownMiniCluster()}
842    * @return Mini hbase cluster instance created.
843    */
844   public MiniHBaseCluster startMiniCluster(final int numMasters,
845       final int numSlaves, final String[] dataNodeHosts, Class<? extends HMaster> masterClass,
846       Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
847           throws Exception {
848     return startMiniCluster(
849         numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass);
850   }
851 
852   /**
853    * Same as {@link #startMiniCluster(int, int, String[], Class, Class)}, but with custom
854    * number of datanodes.
855    * @param numDataNodes Number of data nodes.
856    */
857   public MiniHBaseCluster startMiniCluster(final int numMasters,
858     final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
859     Class<? extends HMaster> masterClass,
860     Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
861   throws Exception {
862     if (dataNodeHosts != null && dataNodeHosts.length != 0) {
863       numDataNodes = dataNodeHosts.length;
864     }
865 
866     LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
867         numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)");
868 
869     // If we already put up a cluster, fail.
870     if (miniClusterRunning) {
871       throw new IllegalStateException("A mini-cluster is already running");
872     }
873     miniClusterRunning = true;
874 
875     setupClusterTestDir();
876     System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
877 
878     // Bring up mini dfs cluster. This spews a bunch of warnings about missing
879     // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
880     startMiniDFSCluster(numDataNodes, dataNodeHosts);
881 
882     // Start up a zk cluster.
883     if (this.zkCluster == null) {
884       startMiniZKCluster(clusterTestDir);
885     }
886 
887     // Start the MiniHBaseCluster
888     return startMiniHBaseCluster(numMasters, numSlaves, masterClass, regionserverClass);
889   }
890 
891   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
892       throws IOException, InterruptedException{
893     return startMiniHBaseCluster(numMasters, numSlaves, null, null);
894   }
895 
896   /**
897    * Starts up mini hbase cluster.  Usually used after call to
898    * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters.
899    * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
900    * @param numMasters
901    * @param numSlaves
902    * @return Reference to the hbase mini hbase cluster.
903    * @throws IOException
904    * @throws InterruptedException
905    * @see {@link #startMiniCluster()}
906    */
907   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
908         final int numSlaves, Class<? extends HMaster> masterClass,
909         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
910   throws IOException, InterruptedException {
911     // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
912     createRootDir();
913 
914     // These settings will make the server waits until this exact number of
915     // regions servers are connected.
916     if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
917       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, numSlaves);
918     }
919     if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
920       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, numSlaves);
921     }
922 
923     Configuration c = new Configuration(this.conf);
924     this.hbaseCluster =
925         new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass);
926     // Don't leave here till we've done a successful scan of the hbase:meta
927     HTable t = new HTable(c, TableName.META_TABLE_NAME);
928     ResultScanner s = t.getScanner(new Scan());
929     while (s.next() != null) {
930       continue;
931     }
932     s.close();
933     t.close();
934 
935     getHBaseAdmin(); // create immediately the hbaseAdmin
936     LOG.info("Minicluster is up");
937     return (MiniHBaseCluster)this.hbaseCluster;
938   }
939 
940   /**
941    * Starts the hbase cluster up again after shutting it down previously in a
942    * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
943    * @param servers number of region servers
944    * @throws IOException
945    */
946   public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
947     this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
948     // Don't leave here till we've done a successful scan of the hbase:meta
949     HTable t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
950     ResultScanner s = t.getScanner(new Scan());
951     while (s.next() != null) {
952       // do nothing
953     }
954     LOG.info("HBase has been restarted");
955     s.close();
956     t.close();
957   }
958 
959   /**
960    * @return Current mini hbase cluster. Only has something in it after a call
961    * to {@link #startMiniCluster()}.
962    * @see #startMiniCluster()
963    */
964   public MiniHBaseCluster getMiniHBaseCluster() {
965     if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
966       return (MiniHBaseCluster)this.hbaseCluster;
967     }
968     throw new RuntimeException(hbaseCluster + " not an instance of " +
969                                MiniHBaseCluster.class.getName());
970   }
971 
972   /**
973    * Stops mini hbase, zk, and hdfs clusters.
974    * @throws IOException
975    * @see {@link #startMiniCluster(int)}
976    */
977   public void shutdownMiniCluster() throws Exception {
978     LOG.info("Shutting down minicluster");
979     shutdownMiniHBaseCluster();
980     if (!this.passedZkCluster){
981       shutdownMiniZKCluster();
982     }
983     shutdownMiniDFSCluster();
984 
985     cleanupTestDir();
986     miniClusterRunning = false;
987     LOG.info("Minicluster is down");
988   }
989 
990   /**
991    * @return True if we removed the test dirs
992    * @throws IOException
993    */
994   @Override
995   public boolean cleanupTestDir() throws IOException {
996     boolean ret = super.cleanupTestDir();
997     if (deleteDir(this.clusterTestDir)) {
998       this.clusterTestDir = null;
999       return ret & true;
1000     }
1001     return false;
1002   }
1003 
1004   /**
1005    * Shutdown HBase mini cluster.  Does not shutdown zk or dfs if running.
1006    * @throws IOException
1007    */
1008   public void shutdownMiniHBaseCluster() throws IOException {
1009     if (hbaseAdmin != null) {
1010       hbaseAdmin.close0();
1011       hbaseAdmin = null;
1012     }
1013 
1014     // unset the configuration for MIN and MAX RS to start
1015     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1016     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1017     if (this.hbaseCluster != null) {
1018       this.hbaseCluster.shutdown();
1019       // Wait till hbase is down before going on to shutdown zk.
1020       this.hbaseCluster.waitUntilShutDown();
1021       this.hbaseCluster = null;
1022     }
1023 
1024     if (zooKeeperWatcher != null) {
1025       zooKeeperWatcher.close();
1026       zooKeeperWatcher = null;
1027     }
1028   }
1029 
1030   /**
1031    * Returns the path to the default root dir the minicluster uses.
1032    * Note: this does not cause the root dir to be created.
1033    * @return Fully qualified path for the default hbase root dir
1034    * @throws IOException
1035    */
1036   public Path getDefaultRootDirPath() throws IOException {
1037 	FileSystem fs = FileSystem.get(this.conf);
1038 	return new Path(fs.makeQualified(fs.getHomeDirectory()),"hbase");
1039   }
1040 
1041   /**
1042    * Creates an hbase rootdir in user home directory.  Also creates hbase
1043    * version file.  Normally you won't make use of this method.  Root hbasedir
1044    * is created for you as part of mini cluster startup.  You'd only use this
1045    * method if you were doing manual operation.
1046    * @return Fully qualified path to hbase root dir
1047    * @throws IOException
1048    */
1049   public Path createRootDir() throws IOException {
1050     FileSystem fs = FileSystem.get(this.conf);
1051     Path hbaseRootdir = getDefaultRootDirPath();
1052     FSUtils.setRootDir(this.conf, hbaseRootdir);
1053     fs.mkdirs(hbaseRootdir);
1054     FSUtils.setVersion(fs, hbaseRootdir);
1055     return hbaseRootdir;
1056   }
1057 
1058   /**
1059    * Flushes all caches in the mini hbase cluster
1060    * @throws IOException
1061    */
1062   public void flush() throws IOException {
1063     getMiniHBaseCluster().flushcache();
1064   }
1065 
1066   /**
1067    * Flushes all caches in the mini hbase cluster
1068    * @throws IOException
1069    */
1070   public void flush(TableName tableName) throws IOException {
1071     getMiniHBaseCluster().flushcache(tableName);
1072   }
1073 
1074   /**
1075    * Compact all regions in the mini hbase cluster
1076    * @throws IOException
1077    */
1078   public void compact(boolean major) throws IOException {
1079     getMiniHBaseCluster().compact(major);
1080   }
1081 
1082   /**
1083    * Compact all of a table's reagion in the mini hbase cluster
1084    * @throws IOException
1085    */
1086   public void compact(TableName tableName, boolean major) throws IOException {
1087     getMiniHBaseCluster().compact(tableName, major);
1088   }
1089 
1090   /**
1091    * Create a table.
1092    * @param tableName
1093    * @param family
1094    * @return An HTable instance for the created table.
1095    * @throws IOException
1096    */
1097   public HTable createTable(String tableName, String family)
1098   throws IOException{
1099     return createTable(TableName.valueOf(tableName), new String[]{family});
1100   }
1101 
1102   /**
1103    * Create a table.
1104    * @param tableName
1105    * @param family
1106    * @return An HTable instance for the created table.
1107    * @throws IOException
1108    */
1109   public HTable createTable(byte[] tableName, byte[] family)
1110   throws IOException{
1111     return createTable(TableName.valueOf(tableName), new byte[][]{family});
1112   }
1113 
1114   /**
1115    * Create a table.
1116    * @param tableName
1117    * @param families
1118    * @return An HTable instance for the created table.
1119    * @throws IOException
1120    */
1121   public HTable createTable(TableName tableName, String[] families)
1122   throws IOException {
1123     List<byte[]> fams = new ArrayList<byte[]>(families.length);
1124     for (String family : families) {
1125       fams.add(Bytes.toBytes(family));
1126     }
1127     return createTable(tableName, fams.toArray(new byte[0][]));
1128   }
1129 
1130   /**
1131    * Create a table.
1132    * @param tableName
1133    * @param family
1134    * @return An HTable instance for the created table.
1135    * @throws IOException
1136    */
1137   public HTable createTable(TableName tableName, byte[] family)
1138   throws IOException{
1139     return createTable(tableName, new byte[][]{family});
1140   }
1141 
1142 
1143   /**
1144    * Create a table.
1145    * @param tableName
1146    * @param families
1147    * @return An HTable instance for the created table.
1148    * @throws IOException
1149    */
1150   public HTable createTable(byte[] tableName, byte[][] families)
1151   throws IOException {
1152     return createTable(tableName, families,
1153         new Configuration(getConfiguration()));
1154   }
1155 
1156   /**
1157    * Create a table.
1158    * @param tableName
1159    * @param families
1160    * @return An HTable instance for the created table.
1161    * @throws IOException
1162    */
1163   public HTable createTable(TableName tableName, byte[][] families)
1164   throws IOException {
1165     return createTable(tableName, families,
1166         new Configuration(getConfiguration()));
1167   }
1168 
1169   public HTable createTable(byte[] tableName, byte[][] families,
1170       int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1171     return createTable(TableName.valueOf(tableName), families, numVersions,
1172         startKey, endKey, numRegions);
1173   }
1174 
1175   public HTable createTable(String tableName, byte[][] families,
1176       int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1177     return createTable(TableName.valueOf(tableName), families, numVersions,
1178         startKey, endKey, numRegions);
1179   }
1180 
1181   public HTable createTable(TableName tableName, byte[][] families,
1182       int numVersions, byte[] startKey, byte[] endKey, int numRegions)
1183   throws IOException{
1184     HTableDescriptor desc = new HTableDescriptor(tableName);
1185     for (byte[] family : families) {
1186       HColumnDescriptor hcd = new HColumnDescriptor(family)
1187           .setMaxVersions(numVersions);
1188       desc.addFamily(hcd);
1189     }
1190     getHBaseAdmin().createTable(desc, startKey, endKey, numRegions);
1191     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1192     waitUntilAllRegionsAssigned(tableName);
1193     return new HTable(getConfiguration(), tableName);
1194   }
1195 
1196   /**
1197    * Create a table.
1198    * @param htd
1199    * @param families
1200    * @param c Configuration to use
1201    * @return An HTable instance for the created table.
1202    * @throws IOException
1203    */
1204   public HTable createTable(HTableDescriptor htd, byte[][] families, Configuration c)
1205   throws IOException {
1206     for(byte[] family : families) {
1207       HColumnDescriptor hcd = new HColumnDescriptor(family);
1208       // Disable blooms (they are on by default as of 0.95) but we disable them here because
1209       // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1210       // on is interfering.
1211       hcd.setBloomFilterType(BloomType.NONE);
1212       htd.addFamily(hcd);
1213     }
1214     getHBaseAdmin().createTable(htd);
1215     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1216     waitUntilAllRegionsAssigned(htd.getTableName());
1217     return new HTable(c, htd.getTableName());
1218   }
1219 
1220   /**
1221    * Create a table.
1222    * @param htd
1223    * @param splitRows
1224    * @return An HTable instance for the created table.
1225    * @throws IOException
1226    */
1227   public HTable createTable(HTableDescriptor htd, byte[][] splitRows)
1228       throws IOException {
1229     getHBaseAdmin().createTable(htd, splitRows);
1230     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1231     waitUntilAllRegionsAssigned(htd.getTableName());
1232     return new HTable(getConfiguration(), htd.getTableName());
1233   }
1234 
1235   /**
1236    * Create a table.
1237    * @param tableName
1238    * @param families
1239    * @param c Configuration to use
1240    * @return An HTable instance for the created table.
1241    * @throws IOException
1242    */
1243   public HTable createTable(TableName tableName, byte[][] families,
1244       final Configuration c)
1245   throws IOException {
1246     return createTable(new HTableDescriptor(tableName), families, c);
1247   }
1248 
1249   /**
1250    * Create a table.
1251    * @param tableName
1252    * @param families
1253    * @param c Configuration to use
1254    * @return An HTable instance for the created table.
1255    * @throws IOException
1256    */
1257   public HTable createTable(byte[] tableName, byte[][] families,
1258       final Configuration c)
1259   throws IOException {
1260     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1261     for(byte[] family : families) {
1262       HColumnDescriptor hcd = new HColumnDescriptor(family);
1263       // Disable blooms (they are on by default as of 0.95) but we disable them here because
1264       // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1265       // on is interfering.
1266       hcd.setBloomFilterType(BloomType.NONE);
1267       desc.addFamily(hcd);
1268     }
1269     getHBaseAdmin().createTable(desc);
1270     return new HTable(c, tableName);
1271   }
1272 
1273   /**
1274    * Create a table.
1275    * @param tableName
1276    * @param families
1277    * @param c Configuration to use
1278    * @param numVersions
1279    * @return An HTable instance for the created table.
1280    * @throws IOException
1281    */
1282   public HTable createTable(TableName tableName, byte[][] families,
1283       final Configuration c, int numVersions)
1284   throws IOException {
1285     HTableDescriptor desc = new HTableDescriptor(tableName);
1286     for(byte[] family : families) {
1287       HColumnDescriptor hcd = new HColumnDescriptor(family)
1288           .setMaxVersions(numVersions);
1289       desc.addFamily(hcd);
1290     }
1291     getHBaseAdmin().createTable(desc);
1292     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1293     waitUntilAllRegionsAssigned(tableName);
1294     return new HTable(c, tableName);
1295   }
1296 
1297   /**
1298    * Create a table.
1299    * @param tableName
1300    * @param families
1301    * @param c Configuration to use
1302    * @param numVersions
1303    * @return An HTable instance for the created table.
1304    * @throws IOException
1305    */
1306   public HTable createTable(byte[] tableName, byte[][] families,
1307       final Configuration c, int numVersions)
1308   throws IOException {
1309     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1310     for(byte[] family : families) {
1311       HColumnDescriptor hcd = new HColumnDescriptor(family)
1312           .setMaxVersions(numVersions);
1313       desc.addFamily(hcd);
1314     }
1315     getHBaseAdmin().createTable(desc);
1316     return new HTable(c, tableName);
1317   }
1318 
1319   /**
1320    * Create a table.
1321    * @param tableName
1322    * @param family
1323    * @param numVersions
1324    * @return An HTable instance for the created table.
1325    * @throws IOException
1326    */
1327   public HTable createTable(byte[] tableName, byte[] family, int numVersions)
1328   throws IOException {
1329     return createTable(tableName, new byte[][]{family}, numVersions);
1330   }
1331 
1332   /**
1333    * Create a table.
1334    * @param tableName
1335    * @param family
1336    * @param numVersions
1337    * @return An HTable instance for the created table.
1338    * @throws IOException
1339    */
1340   public HTable createTable(TableName tableName, byte[] family, int numVersions)
1341   throws IOException {
1342     return createTable(tableName, new byte[][]{family}, numVersions);
1343   }
1344 
1345   /**
1346    * Create a table.
1347    * @param tableName
1348    * @param families
1349    * @param numVersions
1350    * @return An HTable instance for the created table.
1351    * @throws IOException
1352    */
1353   public HTable createTable(byte[] tableName, byte[][] families,
1354       int numVersions)
1355   throws IOException {
1356     return createTable(TableName.valueOf(tableName), families, numVersions);
1357   }
1358 
1359   /**
1360    * Create a table.
1361    * @param tableName
1362    * @param families
1363    * @param numVersions
1364    * @return An HTable instance for the created table.
1365    * @throws IOException
1366    */
1367   public HTable createTable(TableName tableName, byte[][] families,
1368       int numVersions)
1369   throws IOException {
1370     HTableDescriptor desc = new HTableDescriptor(tableName);
1371     for (byte[] family : families) {
1372       HColumnDescriptor hcd = new HColumnDescriptor(family).setMaxVersions(numVersions);
1373       desc.addFamily(hcd);
1374     }
1375     getHBaseAdmin().createTable(desc);
1376     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1377     waitUntilAllRegionsAssigned(tableName);
1378     return new HTable(new Configuration(getConfiguration()), tableName);
1379   }
1380 
1381   /**
1382    * Create a table.
1383    * @param tableName
1384    * @param families
1385    * @param numVersions
1386    * @return An HTable instance for the created table.
1387    * @throws IOException
1388    */
1389   public HTable createTable(byte[] tableName, byte[][] families,
1390     int numVersions, int blockSize) throws IOException {
1391     return createTable(TableName.valueOf(tableName),
1392         families, numVersions, blockSize);
1393   }
1394 
1395   /**
1396    * Create a table.
1397    * @param tableName
1398    * @param families
1399    * @param numVersions
1400    * @return An HTable instance for the created table.
1401    * @throws IOException
1402    */
1403   public HTable createTable(TableName tableName, byte[][] families,
1404     int numVersions, int blockSize) throws IOException {
1405     HTableDescriptor desc = new HTableDescriptor(tableName);
1406     for (byte[] family : families) {
1407       HColumnDescriptor hcd = new HColumnDescriptor(family)
1408           .setMaxVersions(numVersions)
1409           .setBlocksize(blockSize);
1410       desc.addFamily(hcd);
1411     }
1412     getHBaseAdmin().createTable(desc);
1413     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1414     waitUntilAllRegionsAssigned(tableName);
1415     return new HTable(new Configuration(getConfiguration()), tableName);
1416   }
1417 
1418   /**
1419    * Create a table.
1420    * @param tableName
1421    * @param families
1422    * @param numVersions
1423    * @return An HTable instance for the created table.
1424    * @throws IOException
1425    */
1426   public HTable createTable(byte[] tableName, byte[][] families,
1427       int[] numVersions)
1428   throws IOException {
1429     return createTable(TableName.valueOf(tableName), families, numVersions);
1430   }
1431 
1432   /**
1433    * Create a table.
1434    * @param tableName
1435    * @param families
1436    * @param numVersions
1437    * @return An HTable instance for the created table.
1438    * @throws IOException
1439    */
1440   public HTable createTable(TableName tableName, byte[][] families,
1441       int[] numVersions)
1442   throws IOException {
1443     HTableDescriptor desc = new HTableDescriptor(tableName);
1444     int i = 0;
1445     for (byte[] family : families) {
1446       HColumnDescriptor hcd = new HColumnDescriptor(family)
1447           .setMaxVersions(numVersions[i]);
1448       desc.addFamily(hcd);
1449       i++;
1450     }
1451     getHBaseAdmin().createTable(desc);
1452     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1453     waitUntilAllRegionsAssigned(tableName);
1454     return new HTable(new Configuration(getConfiguration()), tableName);
1455   }
1456 
1457   /**
1458    * Create a table.
1459    * @param tableName
1460    * @param family
1461    * @param splitRows
1462    * @return An HTable instance for the created table.
1463    * @throws IOException
1464    */
1465   public HTable createTable(byte[] tableName, byte[] family, byte[][] splitRows)
1466     throws IOException{
1467     return createTable(TableName.valueOf(tableName), family, splitRows);
1468   }
1469 
1470   /**
1471    * Create a table.
1472    * @param tableName
1473    * @param family
1474    * @param splitRows
1475    * @return An HTable instance for the created table.
1476    * @throws IOException
1477    */
1478   public HTable createTable(TableName tableName, byte[] family, byte[][] splitRows)
1479       throws IOException {
1480     HTableDescriptor desc = new HTableDescriptor(tableName);
1481     HColumnDescriptor hcd = new HColumnDescriptor(family);
1482     desc.addFamily(hcd);
1483     getHBaseAdmin().createTable(desc, splitRows);
1484     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1485     waitUntilAllRegionsAssigned(tableName);
1486     return new HTable(getConfiguration(), tableName);
1487   }
1488 
1489   /**
1490    * Create a table.
1491    * @param tableName
1492    * @param families
1493    * @param splitRows
1494    * @return An HTable instance for the created table.
1495    * @throws IOException
1496    */
1497   public HTable createTable(byte[] tableName, byte[][] families, byte[][] splitRows)
1498       throws IOException {
1499     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1500     for(byte[] family:families) {
1501       HColumnDescriptor hcd = new HColumnDescriptor(family);
1502       desc.addFamily(hcd);
1503     }
1504     getHBaseAdmin().createTable(desc, splitRows);
1505     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1506     waitUntilAllRegionsAssigned(TableName.valueOf(tableName));
1507     return new HTable(getConfiguration(), tableName);
1508   }
1509 
1510   /**
1511    * Drop an existing table
1512    * @param tableName existing table
1513    */
1514   public void deleteTable(String tableName) throws IOException {
1515     deleteTable(TableName.valueOf(tableName));
1516   }
1517 
1518   /**
1519    * Drop an existing table
1520    * @param tableName existing table
1521    */
1522   public void deleteTable(byte[] tableName) throws IOException {
1523     deleteTable(TableName.valueOf(tableName));
1524   }
1525 
1526   /**
1527    * Drop an existing table
1528    * @param tableName existing table
1529    */
1530   public void deleteTable(TableName tableName) throws IOException {
1531     try {
1532       getHBaseAdmin().disableTable(tableName);
1533     } catch (TableNotEnabledException e) {
1534       LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1535     }
1536     getHBaseAdmin().deleteTable(tableName);
1537   }
1538 
1539   // ==========================================================================
1540   // Canned table and table descriptor creation
1541   // TODO replace HBaseTestCase
1542 
1543   public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1544   public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1545   public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1546   public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1547   private static final int MAXVERSIONS = 3;
1548 
1549   public static final char FIRST_CHAR = 'a';
1550   public static final char LAST_CHAR = 'z';
1551   public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1552   public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1553 
1554   /**
1555    * Create a table of name <code>name</code> with {@link COLUMNS} for
1556    * families.
1557    * @param name Name to give table.
1558    * @param versions How many versions to allow per column.
1559    * @return Column descriptor.
1560    */
1561   public HTableDescriptor createTableDescriptor(final String name,
1562       final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1563     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name));
1564     for (byte[] cfName : new byte[][]{ fam1, fam2, fam3 }) {
1565       htd.addFamily(new HColumnDescriptor(cfName)
1566           .setMinVersions(minVersions)
1567           .setMaxVersions(versions)
1568           .setKeepDeletedCells(keepDeleted)
1569           .setBlockCacheEnabled(false)
1570           .setTimeToLive(ttl)
1571       );
1572     }
1573     return htd;
1574   }
1575 
1576   /**
1577    * Create a table of name <code>name</code> with {@link COLUMNS} for
1578    * families.
1579    * @param name Name to give table.
1580    * @return Column descriptor.
1581    */
1582   public HTableDescriptor createTableDescriptor(final String name) {
1583     return createTableDescriptor(name,  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1584         MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1585   }
1586 
1587   /**
1588    * Create an HRegion that writes to the local tmp dirs
1589    * @param desc
1590    * @param startKey
1591    * @param endKey
1592    * @return
1593    * @throws IOException
1594    */
1595   public HRegion createLocalHRegion(HTableDescriptor desc, byte [] startKey,
1596       byte [] endKey)
1597   throws IOException {
1598     HRegionInfo hri = new HRegionInfo(desc.getTableName(), startKey, endKey);
1599     return createLocalHRegion(hri, desc);
1600   }
1601 
1602   /**
1603    * Create an HRegion that writes to the local tmp dirs
1604    * @param info
1605    * @param desc
1606    * @return
1607    * @throws IOException
1608    */
1609   public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc) throws IOException {
1610     return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc);
1611   }
1612 
1613   /**
1614    * Create an HRegion that writes to the local tmp dirs with specified hlog
1615    * @param info regioninfo
1616    * @param desc table descriptor
1617    * @param hlog hlog for this region.
1618    * @return created hregion
1619    * @throws IOException
1620    */
1621   public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc, HLog hlog) throws IOException {
1622     return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, hlog);
1623   }
1624 
1625 
1626   /**
1627    * @param tableName
1628    * @param startKey
1629    * @param stopKey
1630    * @param callingMethod
1631    * @param conf
1632    * @param isReadOnly
1633    * @param families
1634    * @throws IOException
1635    * @return A region on which you must call
1636    *         {@link HRegion#closeHRegion(HRegion)} when done.
1637    */
1638   public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1639       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1640       HLog hlog, byte[]... families) throws IOException {
1641     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1642     htd.setReadOnly(isReadOnly);
1643     for (byte[] family : families) {
1644       HColumnDescriptor hcd = new HColumnDescriptor(family);
1645       // Set default to be three versions.
1646       hcd.setMaxVersions(Integer.MAX_VALUE);
1647       htd.addFamily(hcd);
1648     }
1649     htd.setDurability(durability);
1650     HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
1651     return createLocalHRegion(info, htd, hlog);
1652   }
1653   //
1654   // ==========================================================================
1655 
1656   /**
1657    * Provide an existing table name to truncate
1658    * @param tableName existing table
1659    * @return HTable to that new table
1660    * @throws IOException
1661    */
1662   public HTable truncateTable(byte[] tableName) throws IOException {
1663     return truncateTable(TableName.valueOf(tableName));
1664   }
1665 
1666   /**
1667    * Provide an existing table name to truncate
1668    * @param tableName existing table
1669    * @return HTable to that new table
1670    * @throws IOException
1671    */
1672   public HTable truncateTable(TableName tableName) throws IOException {
1673     HTable table = new HTable(getConfiguration(), tableName);
1674     Scan scan = new Scan();
1675     ResultScanner resScan = table.getScanner(scan);
1676     for(Result res : resScan) {
1677       Delete del = new Delete(res.getRow());
1678       table.delete(del);
1679     }
1680     resScan = table.getScanner(scan);
1681     resScan.close();
1682     return table;
1683   }
1684 
1685   /**
1686    * Load table with rows from 'aaa' to 'zzz'.
1687    * @param t Table
1688    * @param f Family
1689    * @return Count of rows loaded.
1690    * @throws IOException
1691    */
1692   public int loadTable(final HTable t, final byte[] f) throws IOException {
1693     return loadTable(t, new byte[][] {f});
1694   }
1695 
1696   /**
1697    * Load table with rows from 'aaa' to 'zzz'.
1698    * @param t Table
1699    * @param f Family
1700    * @return Count of rows loaded.
1701    * @throws IOException
1702    */
1703   public int loadTable(final HTable t, final byte[] f, boolean writeToWAL) throws IOException {
1704     return loadTable(t, new byte[][] {f}, null, writeToWAL);
1705   }
1706 
1707   /**
1708    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1709    * @param t Table
1710    * @param f Array of Families to load
1711    * @return Count of rows loaded.
1712    * @throws IOException
1713    */
1714   public int loadTable(final HTable t, final byte[][] f) throws IOException {
1715     return loadTable(t, f, null);
1716   }
1717 
1718   /**
1719    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1720    * @param t Table
1721    * @param f Array of Families to load
1722    * @param value the values of the cells. If null is passed, the row key is used as value
1723    * @return Count of rows loaded.
1724    * @throws IOException
1725    */
1726   public int loadTable(final HTable t, final byte[][] f, byte[] value) throws IOException {
1727     return loadTable(t, f, value, true);
1728   }
1729 
1730   /**
1731    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1732    * @param t Table
1733    * @param f Array of Families to load
1734    * @param value the values of the cells. If null is passed, the row key is used as value
1735    * @return Count of rows loaded.
1736    * @throws IOException
1737    */
1738   public int loadTable(final HTable t, final byte[][] f, byte[] value, boolean writeToWAL) throws IOException {
1739     t.setAutoFlush(false);
1740     int rowCount = 0;
1741     for (byte[] row : HBaseTestingUtility.ROWS) {
1742       Put put = new Put(row);
1743       put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1744       for (int i = 0; i < f.length; i++) {
1745         put.add(f[i], null, value != null ? value : row);
1746       }
1747       t.put(put);
1748       rowCount++;
1749     }
1750     t.flushCommits();
1751     return rowCount;
1752   }
1753 
1754   /** A tracker for tracking and validating table rows
1755    * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])}
1756    */
1757   public static class SeenRowTracker {
1758     int dim = 'z' - 'a' + 1;
1759     int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
1760     byte[] startRow;
1761     byte[] stopRow;
1762 
1763     public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1764       this.startRow = startRow;
1765       this.stopRow = stopRow;
1766     }
1767 
1768     void reset() {
1769       for (byte[] row : ROWS) {
1770         seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1771       }
1772     }
1773 
1774     int i(byte b) {
1775       return b - 'a';
1776     }
1777 
1778     public void addRow(byte[] row) {
1779       seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1780     }
1781 
1782     /** Validate that all the rows between startRow and stopRow are seen exactly once, and
1783      * all other rows none
1784      */
1785     public void validate() {
1786       for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1787         for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1788           for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1789             int count = seenRows[i(b1)][i(b2)][i(b3)];
1790             int expectedCount = 0;
1791             if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
1792                 && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
1793               expectedCount = 1;
1794             }
1795             if (count != expectedCount) {
1796               String row = new String(new byte[] {b1,b2,b3});
1797               throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
1798             }
1799           }
1800         }
1801       }
1802     }
1803   }
1804 
1805   public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1806     return loadRegion(r, f, false);
1807   }
1808 
1809   /**
1810    * Load region with rows from 'aaa' to 'zzz'.
1811    * @param r Region
1812    * @param f Family
1813    * @param flush flush the cache if true
1814    * @return Count of rows loaded.
1815    * @throws IOException
1816    */
1817   public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
1818   throws IOException {
1819     byte[] k = new byte[3];
1820     int rowCount = 0;
1821     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1822       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1823         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1824           k[0] = b1;
1825           k[1] = b2;
1826           k[2] = b3;
1827           Put put = new Put(k);
1828           put.setDurability(Durability.SKIP_WAL);
1829           put.add(f, null, k);
1830           if (r.getLog() == null) put.setDurability(Durability.SKIP_WAL);
1831 
1832           int preRowCount = rowCount;
1833           int pause = 10;
1834           int maxPause = 1000;
1835           while (rowCount == preRowCount) {
1836             try {
1837               r.put(put);
1838               rowCount++;
1839             } catch (RegionTooBusyException e) {
1840               pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1841               Threads.sleep(pause);
1842             }
1843           }
1844         }
1845       }
1846       if (flush) {
1847         r.flushcache();
1848       }
1849     }
1850     return rowCount;
1851   }
1852 
1853   public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException {
1854     for (int i = startRow; i < endRow; i++) {
1855       byte[] data = Bytes.toBytes(String.valueOf(i));
1856       Put put = new Put(data);
1857       put.add(f, null, data);
1858       t.put(put);
1859     }
1860   }
1861 
1862   /**
1863    * Return the number of rows in the given table.
1864    */
1865   public int countRows(final HTable table) throws IOException {
1866     Scan scan = new Scan();
1867     ResultScanner results = table.getScanner(scan);
1868     int count = 0;
1869     for (@SuppressWarnings("unused") Result res : results) {
1870       count++;
1871     }
1872     results.close();
1873     return count;
1874   }
1875 
1876   public int countRows(final HTable table, final byte[]... families) throws IOException {
1877     Scan scan = new Scan();
1878     for (byte[] family: families) {
1879       scan.addFamily(family);
1880     }
1881     ResultScanner results = table.getScanner(scan);
1882     int count = 0;
1883     for (@SuppressWarnings("unused") Result res : results) {
1884       count++;
1885     }
1886     results.close();
1887     return count;
1888   }
1889 
1890   /**
1891    * Return an md5 digest of the entire contents of a table.
1892    */
1893   public String checksumRows(final HTable table) throws Exception {
1894     Scan scan = new Scan();
1895     ResultScanner results = table.getScanner(scan);
1896     MessageDigest digest = MessageDigest.getInstance("MD5");
1897     for (Result res : results) {
1898       digest.update(res.getRow());
1899     }
1900     results.close();
1901     return digest.toString();
1902   }
1903 
1904   /**
1905    * Creates many regions names "aaa" to "zzz".
1906    *
1907    * @param table  The table to use for the data.
1908    * @param columnFamily  The family to insert the data into.
1909    * @return count of regions created.
1910    * @throws IOException When creating the regions fails.
1911    */
1912   public int createMultiRegions(HTable table, byte[] columnFamily)
1913   throws IOException {
1914     return createMultiRegions(getConfiguration(), table, columnFamily);
1915   }
1916 
1917   /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */
1918   public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
1919   static {
1920     int i = 0;
1921     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1922       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1923         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1924           ROWS[i][0] = b1;
1925           ROWS[i][1] = b2;
1926           ROWS[i][2] = b3;
1927           i++;
1928         }
1929       }
1930     }
1931   }
1932 
1933   public static final byte[][] KEYS = {
1934     HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
1935     Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
1936     Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
1937     Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
1938     Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
1939     Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
1940     Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
1941     Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
1942     Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
1943   };
1944 
1945   public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
1946       Bytes.toBytes("bbb"),
1947       Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
1948       Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
1949       Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
1950       Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
1951       Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
1952       Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
1953       Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
1954       Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
1955   };
1956 
1957   /**
1958    * Creates many regions names "aaa" to "zzz".
1959    * @param c Configuration to use.
1960    * @param table  The table to use for the data.
1961    * @param columnFamily  The family to insert the data into.
1962    * @return count of regions created.
1963    * @throws IOException When creating the regions fails.
1964    */
1965   public int createMultiRegions(final Configuration c, final HTable table,
1966       final byte[] columnFamily)
1967   throws IOException {
1968     return createMultiRegions(c, table, columnFamily, KEYS);
1969   }
1970 
1971   void makeDFSClientNonRetrying() {
1972     if (null == this.dfsCluster) {
1973       LOG.debug("dfsCluster has not started, can't make client non-retrying.");
1974       return;
1975     }
1976     try {
1977       final FileSystem filesystem = this.dfsCluster.getFileSystem();
1978       if (!(filesystem instanceof DistributedFileSystem)) {
1979         LOG.debug("dfsCluster is not backed by a DistributedFileSystem, can't make client non-retrying.");
1980         return;
1981       }
1982       // rely on FileSystem.CACHE to alter how we talk via DFSClient
1983       final DistributedFileSystem fs = (DistributedFileSystem)filesystem;
1984       // retrieve the backing DFSClient instance
1985       final Field dfsField = fs.getClass().getDeclaredField("dfs");
1986       dfsField.setAccessible(true);
1987       final Class<?> dfsClazz = dfsField.getType();
1988       final DFSClient dfs = DFSClient.class.cast(dfsField.get(fs));
1989 
1990       // expose the method for creating direct RPC connections.
1991       final Method createRPCNamenode = dfsClazz.getDeclaredMethod("createRPCNamenode", InetSocketAddress.class, Configuration.class, UserGroupInformation.class);
1992       createRPCNamenode.setAccessible(true);
1993 
1994       // grab the DFSClient instance's backing connection information
1995       final Field nnField = dfsClazz.getDeclaredField("nnAddress");
1996       nnField.setAccessible(true);
1997       final InetSocketAddress nnAddress = InetSocketAddress.class.cast(nnField.get(dfs));
1998       final Field confField = dfsClazz.getDeclaredField("conf");
1999       confField.setAccessible(true);
2000       final Configuration conf = Configuration.class.cast(confField.get(dfs));
2001       final Field ugiField = dfsClazz.getDeclaredField("ugi");
2002       ugiField.setAccessible(true);
2003       final UserGroupInformation ugi = UserGroupInformation.class.cast(ugiField.get(dfs));
2004 
2005       // replace the proxy for the namenode rpc with a direct instance
2006       final Field namenodeField = dfsClazz.getDeclaredField("namenode");
2007       namenodeField.setAccessible(true);
2008       namenodeField.set(dfs, createRPCNamenode.invoke(null, nnAddress, conf, ugi));
2009       LOG.debug("Set DSFClient namenode to bare RPC");
2010     } catch (Exception exception) {
2011       LOG.info("Could not alter DFSClient to be non-retrying.", exception);
2012     }
2013   }
2014 
2015   /**
2016    * Creates the specified number of regions in the specified table.
2017    * @param c
2018    * @param table
2019    * @param family
2020    * @param numRegions
2021    * @return
2022    * @throws IOException
2023    */
2024   public int createMultiRegions(final Configuration c, final HTable table,
2025       final byte [] family, int numRegions)
2026   throws IOException {
2027     if (numRegions < 3) throw new IOException("Must create at least 3 regions");
2028     byte [] startKey = Bytes.toBytes("aaaaa");
2029     byte [] endKey = Bytes.toBytes("zzzzz");
2030     byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2031     byte [][] regionStartKeys = new byte[splitKeys.length+1][];
2032     System.arraycopy(splitKeys, 0, regionStartKeys, 1, splitKeys.length);
2033     regionStartKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
2034     return createMultiRegions(c, table, family, regionStartKeys);
2035   }
2036 
2037   @SuppressWarnings("deprecation")
2038   public int createMultiRegions(final Configuration c, final HTable table,
2039       final byte[] columnFamily, byte [][] startKeys)
2040   throws IOException {
2041     Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2042     HTable meta = new HTable(c, TableName.META_TABLE_NAME);
2043     HTableDescriptor htd = table.getTableDescriptor();
2044     if(!htd.hasFamily(columnFamily)) {
2045       HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
2046       htd.addFamily(hcd);
2047     }
2048     // remove empty region - this is tricky as the mini cluster during the test
2049     // setup already has the "<tablename>,,123456789" row with an empty start
2050     // and end key. Adding the custom regions below adds those blindly,
2051     // including the new start region from empty to "bbb". lg
2052     List<byte[]> rows = getMetaTableRows(htd.getTableName());
2053     String regionToDeleteInFS = table
2054         .getRegionsInRange(Bytes.toBytes(""), Bytes.toBytes("")).get(0)
2055         .getRegionInfo().getEncodedName();
2056     List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
2057     // add custom ones
2058     int count = 0;
2059     for (int i = 0; i < startKeys.length; i++) {
2060       int j = (i + 1) % startKeys.length;
2061       HRegionInfo hri = new HRegionInfo(table.getName(),
2062         startKeys[i], startKeys[j]);
2063       MetaEditor.addRegionToMeta(meta, hri);
2064       newRegions.add(hri);
2065       count++;
2066     }
2067     // see comment above, remove "old" (or previous) single region
2068     for (byte[] row : rows) {
2069       LOG.info("createMultiRegions: deleting meta row -> " +
2070         Bytes.toStringBinary(row));
2071       meta.delete(new Delete(row));
2072     }
2073     // remove the "old" region from FS
2074     Path tableDir = new Path(getDefaultRootDirPath().toString()
2075         + System.getProperty("file.separator") + htd.getTableName()
2076         + System.getProperty("file.separator") + regionToDeleteInFS);
2077     FileSystem.get(c).delete(tableDir);
2078     // flush cache of regions
2079     HConnection conn = table.getConnection();
2080     conn.clearRegionCache();
2081     // assign all the new regions IF table is enabled.
2082     HBaseAdmin admin = getHBaseAdmin();
2083     if (admin.isTableEnabled(table.getTableName())) {
2084       for(HRegionInfo hri : newRegions) {
2085         admin.assign(hri.getRegionName());
2086       }
2087     }
2088 
2089     meta.close();
2090 
2091     return count;
2092   }
2093 
2094   /**
2095    * Create rows in hbase:meta for regions of the specified table with the specified
2096    * start keys.  The first startKey should be a 0 length byte array if you
2097    * want to form a proper range of regions.
2098    * @param conf
2099    * @param htd
2100    * @param startKeys
2101    * @return list of region info for regions added to meta
2102    * @throws IOException
2103    */
2104   public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
2105       final HTableDescriptor htd, byte [][] startKeys)
2106   throws IOException {
2107     HTable meta = new HTable(conf, TableName.META_TABLE_NAME);
2108     Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2109     List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
2110     // add custom ones
2111     for (int i = 0; i < startKeys.length; i++) {
2112       int j = (i + 1) % startKeys.length;
2113       HRegionInfo hri = new HRegionInfo(htd.getTableName(), startKeys[i],
2114           startKeys[j]);
2115       MetaEditor.addRegionToMeta(meta, hri);
2116       newRegions.add(hri);
2117     }
2118 
2119     meta.close();
2120     return newRegions;
2121   }
2122 
2123   /**
2124    * Returns all rows from the hbase:meta table.
2125    *
2126    * @throws IOException When reading the rows fails.
2127    */
2128   public List<byte[]> getMetaTableRows() throws IOException {
2129     // TODO: Redo using MetaReader class
2130     HTable t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2131     List<byte[]> rows = new ArrayList<byte[]>();
2132     ResultScanner s = t.getScanner(new Scan());
2133     for (Result result : s) {
2134       LOG.info("getMetaTableRows: row -> " +
2135         Bytes.toStringBinary(result.getRow()));
2136       rows.add(result.getRow());
2137     }
2138     s.close();
2139     t.close();
2140     return rows;
2141   }
2142 
2143   /**
2144    * Returns all rows from the hbase:meta table for a given user table
2145    *
2146    * @throws IOException When reading the rows fails.
2147    */
2148   public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2149     // TODO: Redo using MetaReader.
2150     HTable t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2151     List<byte[]> rows = new ArrayList<byte[]>();
2152     ResultScanner s = t.getScanner(new Scan());
2153     for (Result result : s) {
2154       HRegionInfo info = HRegionInfo.getHRegionInfo(result);
2155       if (info == null) {
2156         LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2157         // TODO figure out what to do for this new hosed case.
2158         continue;
2159       }
2160 
2161       if (info.getTable().equals(tableName)) {
2162         LOG.info("getMetaTableRows: row -> " +
2163             Bytes.toStringBinary(result.getRow()) + info);
2164         rows.add(result.getRow());
2165       }
2166     }
2167     s.close();
2168     t.close();
2169     return rows;
2170   }
2171 
2172   /**
2173    * Tool to get the reference to the region server object that holds the
2174    * region of the specified user table.
2175    * It first searches for the meta rows that contain the region of the
2176    * specified table, then gets the index of that RS, and finally retrieves
2177    * the RS's reference.
2178    * @param tableName user table to lookup in hbase:meta
2179    * @return region server that holds it, null if the row doesn't exist
2180    * @throws IOException
2181    * @throws InterruptedException
2182    */
2183   public HRegionServer getRSForFirstRegionInTable(byte[] tableName)
2184       throws IOException, InterruptedException {
2185     return getRSForFirstRegionInTable(TableName.valueOf(tableName));
2186   }
2187   /**
2188    * Tool to get the reference to the region server object that holds the
2189    * region of the specified user table.
2190    * It first searches for the meta rows that contain the region of the
2191    * specified table, then gets the index of that RS, and finally retrieves
2192    * the RS's reference.
2193    * @param tableName user table to lookup in hbase:meta
2194    * @return region server that holds it, null if the row doesn't exist
2195    * @throws IOException
2196    */
2197   public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2198       throws IOException, InterruptedException {
2199     List<byte[]> metaRows = getMetaTableRows(tableName);
2200     if (metaRows == null || metaRows.isEmpty()) {
2201       return null;
2202     }
2203     LOG.debug("Found " + metaRows.size() + " rows for table " +
2204       tableName);
2205     byte [] firstrow = metaRows.get(0);
2206     LOG.debug("FirstRow=" + Bytes.toString(firstrow));
2207     long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2208       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2209     int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2210       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2211     RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2212     while(retrier.shouldRetry()) {
2213       int index = getMiniHBaseCluster().getServerWith(firstrow);
2214       if (index != -1) {
2215         return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2216       }
2217       // Came back -1.  Region may not be online yet.  Sleep a while.
2218       retrier.sleepUntilNextRetry();
2219     }
2220     return null;
2221   }
2222 
2223   /**
2224    * Starts a <code>MiniMRCluster</code> with a default number of
2225    * <code>TaskTracker</code>'s.
2226    *
2227    * @throws IOException When starting the cluster fails.
2228    */
2229   public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2230     startMiniMapReduceCluster(2);
2231     return mrCluster;
2232   }
2233 
2234   /**
2235    * Tasktracker has a bug where changing the hadoop.log.dir system property
2236    * will not change its internal static LOG_DIR variable.
2237    */
2238   private void forceChangeTaskLogDir() {
2239     Field logDirField;
2240     try {
2241       logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2242       logDirField.setAccessible(true);
2243 
2244       Field modifiersField = Field.class.getDeclaredField("modifiers");
2245       modifiersField.setAccessible(true);
2246       modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2247 
2248       logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2249     } catch (SecurityException e) {
2250       throw new RuntimeException(e);
2251     } catch (NoSuchFieldException e) {
2252       // TODO Auto-generated catch block
2253       throw new RuntimeException(e);
2254     } catch (IllegalArgumentException e) {
2255       throw new RuntimeException(e);
2256     } catch (IllegalAccessException e) {
2257       throw new RuntimeException(e);
2258     }
2259   }
2260 
2261   /**
2262    * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2263    * filesystem.
2264    * @param servers  The number of <code>TaskTracker</code>'s to start.
2265    * @throws IOException When starting the cluster fails.
2266    */
2267   private void startMiniMapReduceCluster(final int servers) throws IOException {
2268     if (mrCluster != null) {
2269       throw new IllegalStateException("MiniMRCluster is already running");
2270     }
2271     LOG.info("Starting mini mapreduce cluster...");
2272     setupClusterTestDir();
2273     createDirsAndSetProperties();
2274 
2275     forceChangeTaskLogDir();
2276 
2277     //// hadoop2 specific settings
2278     // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2279     // we up the VM usable so that processes don't get killed.
2280     conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2281 
2282     // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2283     // this avoids the problem by disabling speculative task execution in tests.
2284     conf.setBoolean("mapreduce.map.speculative", false);
2285     conf.setBoolean("mapreduce.reduce.speculative", false);
2286     ////
2287 
2288     // Allow the user to override FS URI for this map-reduce cluster to use.
2289     mrCluster = new MiniMRCluster(servers,
2290       FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2291       null, null, new JobConf(this.conf));
2292     JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2293     if (jobConf == null) {
2294       jobConf = mrCluster.createJobConf();
2295     }
2296 
2297     jobConf.set("mapred.local.dir",
2298       conf.get("mapred.local.dir")); //Hadoop MiniMR overwrites this while it should not
2299     LOG.info("Mini mapreduce cluster started");
2300 
2301     // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2302     // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2303     // necessary config properties here.  YARN-129 required adding a few properties.
2304     conf.set("mapred.job.tracker", jobConf.get("mapred.job.tracker"));
2305     // this for mrv2 support; mr1 ignores this
2306     conf.set("mapreduce.framework.name", "yarn");
2307     conf.setBoolean("yarn.is.minicluster", true);
2308     String rmAddress = jobConf.get("yarn.resourcemanager.address");
2309     if (rmAddress != null) {
2310       conf.set("yarn.resourcemanager.address", rmAddress);
2311     }
2312     String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2313     if (historyAddress != null) {
2314       conf.set("mapreduce.jobhistory.address", historyAddress);
2315     }
2316     String schedulerAddress =
2317       jobConf.get("yarn.resourcemanager.scheduler.address");
2318     if (schedulerAddress != null) {
2319       conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2320     }
2321   }
2322 
2323   /**
2324    * Stops the previously started <code>MiniMRCluster</code>.
2325    */
2326   public void shutdownMiniMapReduceCluster() {
2327     LOG.info("Stopping mini mapreduce cluster...");
2328     if (mrCluster != null) {
2329       mrCluster.shutdown();
2330       mrCluster = null;
2331     }
2332     // Restore configuration to point to local jobtracker
2333     conf.set("mapred.job.tracker", "local");
2334     LOG.info("Mini mapreduce cluster stopped");
2335   }
2336 
2337   /**
2338    * Create a stubbed out RegionServerService, mainly for getting FS.
2339    */
2340   public RegionServerServices createMockRegionServerService() throws IOException {
2341     return createMockRegionServerService((ServerName)null);
2342   }
2343 
2344   /**
2345    * Create a stubbed out RegionServerService, mainly for getting FS.
2346    * This version is used by TestTokenAuthentication
2347    */
2348   public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws IOException {
2349     final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2350     rss.setFileSystem(getTestFileSystem());
2351     rss.setRpcServer(rpc);
2352     return rss;
2353   }
2354 
2355   /**
2356    * Create a stubbed out RegionServerService, mainly for getting FS.
2357    * This version is used by TestOpenRegionHandler
2358    */
2359   public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2360     final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2361     rss.setFileSystem(getTestFileSystem());
2362     return rss;
2363   }
2364 
2365   /**
2366    * Switches the logger for the given class to DEBUG level.
2367    *
2368    * @param clazz  The class for which to switch to debug logging.
2369    */
2370   public void enableDebug(Class<?> clazz) {
2371     Log l = LogFactory.getLog(clazz);
2372     if (l instanceof Log4JLogger) {
2373       ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
2374     } else if (l instanceof Jdk14Logger) {
2375       ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
2376     }
2377   }
2378 
2379   /**
2380    * Expire the Master's session
2381    * @throws Exception
2382    */
2383   public void expireMasterSession() throws Exception {
2384     HMaster master = getMiniHBaseCluster().getMaster();
2385     expireSession(master.getZooKeeper(), false);
2386   }
2387 
2388   /**
2389    * Expire a region server's session
2390    * @param index which RS
2391    * @throws Exception
2392    */
2393   public void expireRegionServerSession(int index) throws Exception {
2394     HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2395     expireSession(rs.getZooKeeper(), false);
2396     decrementMinRegionServerCount();
2397   }
2398 
2399   private void decrementMinRegionServerCount() {
2400     // decrement the count for this.conf, for newly spwaned master
2401     // this.hbaseCluster shares this configuration too
2402     decrementMinRegionServerCount(getConfiguration());
2403 
2404     // each master thread keeps a copy of configuration
2405     for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2406       decrementMinRegionServerCount(master.getMaster().getConfiguration());
2407     }
2408   }
2409 
2410   private void decrementMinRegionServerCount(Configuration conf) {
2411     int currentCount = conf.getInt(
2412         ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2413     if (currentCount != -1) {
2414       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
2415           Math.max(currentCount - 1, 1));
2416     }
2417   }
2418 
2419   public void expireSession(ZooKeeperWatcher nodeZK) throws Exception {
2420    expireSession(nodeZK, false);
2421   }
2422 
2423   @Deprecated
2424   public void expireSession(ZooKeeperWatcher nodeZK, Server server)
2425     throws Exception {
2426     expireSession(nodeZK, false);
2427   }
2428 
2429   /**
2430    * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2431    * http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
2432    * There are issues when doing this:
2433    * [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
2434    * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2435    *
2436    * @param nodeZK - the ZK watcher to expire
2437    * @param checkStatus - true to check if we can create an HTable with the
2438    *                    current configuration.
2439    */
2440   public void expireSession(ZooKeeperWatcher nodeZK, boolean checkStatus)
2441     throws Exception {
2442     Configuration c = new Configuration(this.conf);
2443     String quorumServers = ZKConfig.getZKQuorumServersString(c);
2444     ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2445     byte[] password = zk.getSessionPasswd();
2446     long sessionID = zk.getSessionId();
2447 
2448     // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2449     //  so we create a first watcher to be sure that the
2450     //  event was sent. We expect that if our watcher receives the event
2451     //  other watchers on the same machine will get is as well.
2452     // When we ask to close the connection, ZK does not close it before
2453     //  we receive all the events, so don't have to capture the event, just
2454     //  closing the connection should be enough.
2455     ZooKeeper monitor = new ZooKeeper(quorumServers,
2456       1000, new org.apache.zookeeper.Watcher(){
2457       @Override
2458       public void process(WatchedEvent watchedEvent) {
2459         LOG.info("Monitor ZKW received event="+watchedEvent);
2460       }
2461     } , sessionID, password);
2462 
2463     // Making it expire
2464     ZooKeeper newZK = new ZooKeeper(quorumServers,
2465         1000, EmptyWatcher.instance, sessionID, password);
2466 
2467     //ensure that we have connection to the server before closing down, otherwise
2468     //the close session event will be eaten out before we start CONNECTING state
2469     long start = System.currentTimeMillis();
2470     while (newZK.getState() != States.CONNECTED
2471          && System.currentTimeMillis() - start < 1000) {
2472        Thread.sleep(1);
2473     }
2474     newZK.close();
2475     LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2476 
2477     // Now closing & waiting to be sure that the clients get it.
2478     monitor.close();
2479 
2480     if (checkStatus) {
2481       new HTable(new Configuration(conf), TableName.META_TABLE_NAME).close();
2482     }
2483   }
2484 
2485   /**
2486    * Get the Mini HBase cluster.
2487    *
2488    * @return hbase cluster
2489    * @see #getHBaseClusterInterface()
2490    */
2491   public MiniHBaseCluster getHBaseCluster() {
2492     return getMiniHBaseCluster();
2493   }
2494 
2495   /**
2496    * Returns the HBaseCluster instance.
2497    * <p>Returned object can be any of the subclasses of HBaseCluster, and the
2498    * tests referring this should not assume that the cluster is a mini cluster or a
2499    * distributed one. If the test only works on a mini cluster, then specific
2500    * method {@link #getMiniHBaseCluster()} can be used instead w/o the
2501    * need to type-cast.
2502    */
2503   public HBaseCluster getHBaseClusterInterface() {
2504     //implementation note: we should rename this method as #getHBaseCluster(),
2505     //but this would require refactoring 90+ calls.
2506     return hbaseCluster;
2507   }
2508 
2509   /**
2510    * Returns a HBaseAdmin instance.
2511    * This instance is shared between HBaseTestingUtility instance users.
2512    * Closing it has no effect, it will be closed automatically when the
2513    * cluster shutdowns
2514    *
2515    * @return The HBaseAdmin instance.
2516    * @throws IOException
2517    */
2518   public synchronized HBaseAdmin getHBaseAdmin()
2519   throws IOException {
2520     if (hbaseAdmin == null){
2521       hbaseAdmin = new HBaseAdminForTests(getConfiguration());
2522     }
2523     return hbaseAdmin;
2524   }
2525 
2526   private HBaseAdminForTests hbaseAdmin = null;
2527   private static class HBaseAdminForTests extends HBaseAdmin {
2528     public HBaseAdminForTests(Configuration c) throws MasterNotRunningException,
2529         ZooKeeperConnectionException, IOException {
2530       super(c);
2531     }
2532 
2533     @Override
2534     public synchronized void close() throws IOException {
2535       LOG.warn("close() called on HBaseAdmin instance returned from HBaseTestingUtility.getHBaseAdmin()");
2536     }
2537 
2538     private synchronized void close0() throws IOException {
2539       super.close();
2540     }
2541   }
2542 
2543   /**
2544    * Returns a ZooKeeperWatcher instance.
2545    * This instance is shared between HBaseTestingUtility instance users.
2546    * Don't close it, it will be closed automatically when the
2547    * cluster shutdowns
2548    *
2549    * @return The ZooKeeperWatcher instance.
2550    * @throws IOException
2551    */
2552   public synchronized ZooKeeperWatcher getZooKeeperWatcher()
2553     throws IOException {
2554     if (zooKeeperWatcher == null) {
2555       zooKeeperWatcher = new ZooKeeperWatcher(conf, "testing utility",
2556         new Abortable() {
2557         @Override public void abort(String why, Throwable e) {
2558           throw new RuntimeException("Unexpected abort in HBaseTestingUtility:"+why, e);
2559         }
2560         @Override public boolean isAborted() {return false;}
2561       });
2562     }
2563     return zooKeeperWatcher;
2564   }
2565   private ZooKeeperWatcher zooKeeperWatcher;
2566 
2567 
2568 
2569   /**
2570    * Closes the named region.
2571    *
2572    * @param regionName  The region to close.
2573    * @throws IOException
2574    */
2575   public void closeRegion(String regionName) throws IOException {
2576     closeRegion(Bytes.toBytes(regionName));
2577   }
2578 
2579   /**
2580    * Closes the named region.
2581    *
2582    * @param regionName  The region to close.
2583    * @throws IOException
2584    */
2585   public void closeRegion(byte[] regionName) throws IOException {
2586     getHBaseAdmin().closeRegion(regionName, null);
2587   }
2588 
2589   /**
2590    * Closes the region containing the given row.
2591    *
2592    * @param row  The row to find the containing region.
2593    * @param table  The table to find the region.
2594    * @throws IOException
2595    */
2596   public void closeRegionByRow(String row, HTable table) throws IOException {
2597     closeRegionByRow(Bytes.toBytes(row), table);
2598   }
2599 
2600   /**
2601    * Closes the region containing the given row.
2602    *
2603    * @param row  The row to find the containing region.
2604    * @param table  The table to find the region.
2605    * @throws IOException
2606    */
2607   public void closeRegionByRow(byte[] row, HTable table) throws IOException {
2608     HRegionLocation hrl = table.getRegionLocation(row);
2609     closeRegion(hrl.getRegionInfo().getRegionName());
2610   }
2611 
2612   /*
2613    * Retrieves a splittable region randomly from tableName
2614    *
2615    * @param tableName name of table
2616    * @param maxAttempts maximum number of attempts, unlimited for value of -1
2617    * @return the HRegion chosen, null if none was found within limit of maxAttempts
2618    */
2619   public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2620     List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2621     int regCount = regions.size();
2622     Set<Integer> attempted = new HashSet<Integer>();
2623     int idx;
2624     int attempts = 0;
2625     do {
2626       regions = getHBaseCluster().getRegions(tableName);
2627       if (regCount != regions.size()) {
2628         // if there was region movement, clear attempted Set
2629         attempted.clear();
2630       }
2631       regCount = regions.size();
2632       // There are chances that before we get the region for the table from an RS the region may
2633       // be going for CLOSE.  This may be because online schema change is enabled
2634       if (regCount > 0) {
2635         idx = random.nextInt(regCount);
2636         // if we have just tried this region, there is no need to try again
2637         if (attempted.contains(idx))
2638           continue;
2639         try {
2640           regions.get(idx).checkSplit();
2641           return regions.get(idx);
2642         } catch (Exception ex) {
2643           LOG.warn("Caught exception", ex);
2644           attempted.add(idx);
2645         }
2646       }
2647       attempts++;
2648     } while (maxAttempts == -1 || attempts < maxAttempts);
2649     return null;
2650   }
2651 
2652   public MiniZooKeeperCluster getZkCluster() {
2653     return zkCluster;
2654   }
2655 
2656   public void setZkCluster(MiniZooKeeperCluster zkCluster) {
2657     this.passedZkCluster = true;
2658     this.zkCluster = zkCluster;
2659     conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkCluster.getClientPort());
2660   }
2661 
2662   public MiniDFSCluster getDFSCluster() {
2663     return dfsCluster;
2664   }
2665 
2666   public void setDFSCluster(MiniDFSCluster cluster) throws IOException {
2667     if (dfsCluster != null && dfsCluster.isClusterUp()) {
2668       throw new IOException("DFSCluster is already running! Shut it down first.");
2669     }
2670     this.dfsCluster = cluster;
2671   }
2672 
2673   public FileSystem getTestFileSystem() throws IOException {
2674     return HFileSystem.get(conf);
2675   }
2676 
2677   /**
2678    * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
2679    * (30 seconds).
2680    * @param table Table to wait on.
2681    * @throws InterruptedException
2682    * @throws IOException
2683    */
2684   public void waitTableAvailable(byte[] table)
2685       throws InterruptedException, IOException {
2686     waitTableAvailable(getHBaseAdmin(), table, 30000);
2687   }
2688 
2689   public void waitTableAvailable(HBaseAdmin admin, byte[] table)
2690       throws InterruptedException, IOException {
2691     waitTableAvailable(admin, table, 30000);
2692   }
2693 
2694   /**
2695    * Wait until all regions in a table have been assigned
2696    * @param table Table to wait on.
2697    * @param timeoutMillis Timeout.
2698    * @throws InterruptedException
2699    * @throws IOException
2700    */
2701   public void waitTableAvailable(byte[] table, long timeoutMillis)
2702   throws InterruptedException, IOException {
2703     waitTableAvailable(getHBaseAdmin(), table, timeoutMillis);
2704   }
2705 
2706   public void waitTableAvailable(HBaseAdmin admin, byte[] table, long timeoutMillis)
2707   throws InterruptedException, IOException {
2708     long startWait = System.currentTimeMillis();
2709     while (!admin.isTableAvailable(table)) {
2710       assertTrue("Timed out waiting for table to become available " +
2711         Bytes.toStringBinary(table),
2712         System.currentTimeMillis() - startWait < timeoutMillis);
2713       Thread.sleep(200);
2714     }
2715   }
2716 
2717   /**
2718    * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
2719    * regions have been all assigned.  Will timeout after default period (30 seconds)
2720    * @see #waitTableAvailable(byte[])
2721    * @param table Table to wait on.
2722    * @param table
2723    * @throws InterruptedException
2724    * @throws IOException
2725    */
2726   public void waitTableEnabled(byte[] table)
2727       throws InterruptedException, IOException {
2728     waitTableEnabled(getHBaseAdmin(), table, 30000);
2729   }
2730 
2731   public void waitTableEnabled(HBaseAdmin admin, byte[] table)
2732       throws InterruptedException, IOException {
2733     waitTableEnabled(admin, table, 30000);
2734   }
2735 
2736   /**
2737    * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
2738    * regions have been all assigned.
2739    * @see #waitTableAvailable(byte[])
2740    * @param table Table to wait on.
2741    * @param timeoutMillis Time to wait on it being marked enabled.
2742    * @throws InterruptedException
2743    * @throws IOException
2744    */
2745   public void waitTableEnabled(byte[] table, long timeoutMillis)
2746   throws InterruptedException, IOException {
2747     waitTableEnabled(getHBaseAdmin(), table, timeoutMillis);
2748   }
2749 
2750   public void waitTableEnabled(HBaseAdmin admin, byte[] table, long timeoutMillis)
2751   throws InterruptedException, IOException {
2752     long startWait = System.currentTimeMillis();
2753     waitTableAvailable(admin, table, timeoutMillis);
2754     while (!admin.isTableEnabled(table)) {
2755       assertTrue("Timed out waiting for table to become available and enabled " +
2756          Bytes.toStringBinary(table),
2757          System.currentTimeMillis() - startWait < timeoutMillis);
2758       Thread.sleep(200);
2759     }
2760     // Finally make sure all regions are fully open and online out on the cluster. Regions may be
2761     // in the hbase:meta table and almost open on all regionservers but there setting the region
2762     // online in the regionserver is the very last thing done and can take a little while to happen.
2763     // Below we do a get.  The get will retry if a NotServeringRegionException or a
2764     // RegionOpeningException.  It is crass but when done all will be online.
2765     try {
2766       Canary.sniff(admin, TableName.valueOf(table));
2767     } catch (Exception e) {
2768       throw new IOException(e);
2769     }
2770   }
2771 
2772   /**
2773    * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
2774    * Will timeout after default period (30 seconds)
2775    * @param table Table to wait on.
2776    * @throws InterruptedException
2777    * @throws IOException
2778    */
2779   public void waitTableDisabled(byte[] table)
2780       throws InterruptedException, IOException {
2781     waitTableDisabled(getHBaseAdmin(), table, 30000);
2782   }
2783 
2784   public void waitTableDisabled(HBaseAdmin admin, byte[] table)
2785       throws InterruptedException, IOException {
2786     waitTableDisabled(admin, table, 30000);
2787   }
2788 
2789   /**
2790    * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
2791    * @param table Table to wait on.
2792    * @param timeoutMillis Time to wait on it being marked disabled.
2793    * @throws InterruptedException
2794    * @throws IOException
2795    */
2796   public void waitTableDisabled(byte[] table, long timeoutMillis)
2797       throws InterruptedException, IOException {
2798     waitTableDisabled(getHBaseAdmin(), table, timeoutMillis);
2799   }
2800 
2801   public void waitTableDisabled(HBaseAdmin admin, byte[] table, long timeoutMillis)
2802       throws InterruptedException, IOException {
2803     TableName tableName = TableName.valueOf(table);
2804     long startWait = System.currentTimeMillis();
2805     while (!admin.isTableDisabled(tableName)) {
2806       assertTrue("Timed out waiting for table to become disabled " +
2807               Bytes.toStringBinary(table),
2808           System.currentTimeMillis() - startWait < timeoutMillis);
2809       Thread.sleep(200);
2810     }
2811   }
2812 
2813   /**
2814    * 
2815    * Make sure that at least the specified number of region servers
2816    * are running
2817    * @param num minimum number of region servers that should be running
2818    * @return true if we started some servers
2819    * @throws IOException
2820    */
2821   public boolean ensureSomeRegionServersAvailable(final int num)
2822       throws IOException {
2823     boolean startedServer = false;
2824     MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
2825     for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
2826       LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2827       startedServer = true;
2828     }
2829 
2830     return startedServer;
2831   }
2832 
2833 
2834   /**
2835    * Make sure that at least the specified number of region servers
2836    * are running. We don't count the ones that are currently stopping or are
2837    * stopped.
2838    * @param num minimum number of region servers that should be running
2839    * @return true if we started some servers
2840    * @throws IOException
2841    */
2842   public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
2843     throws IOException {
2844     boolean startedServer = ensureSomeRegionServersAvailable(num);
2845 
2846     int nonStoppedServers = 0;
2847     for (JVMClusterUtil.RegionServerThread rst :
2848       getMiniHBaseCluster().getRegionServerThreads()) {
2849 
2850       HRegionServer hrs = rst.getRegionServer();
2851       if (hrs.isStopping() || hrs.isStopped()) {
2852         LOG.info("A region server is stopped or stopping:"+hrs);
2853       } else {
2854         nonStoppedServers++;
2855       }
2856     }
2857     for (int i=nonStoppedServers; i<num; ++i) {
2858       LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2859       startedServer = true;
2860     }
2861     return startedServer;
2862   }
2863 
2864 
2865   /**
2866    * This method clones the passed <code>c</code> configuration setting a new
2867    * user into the clone.  Use it getting new instances of FileSystem.  Only
2868    * works for DistributedFileSystem.
2869    * @param c Initial configuration
2870    * @param differentiatingSuffix Suffix to differentiate this user from others.
2871    * @return A new configuration instance with a different user set into it.
2872    * @throws IOException
2873    */
2874   public static User getDifferentUser(final Configuration c,
2875     final String differentiatingSuffix)
2876   throws IOException {
2877     FileSystem currentfs = FileSystem.get(c);
2878     if (!(currentfs instanceof DistributedFileSystem)) {
2879       return User.getCurrent();
2880     }
2881     // Else distributed filesystem.  Make a new instance per daemon.  Below
2882     // code is taken from the AppendTestUtil over in hdfs.
2883     String username = User.getCurrent().getName() +
2884       differentiatingSuffix;
2885     User user = User.createUserForTesting(c, username,
2886         new String[]{"supergroup"});
2887     return user;
2888   }
2889 
2890   /**
2891    * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
2892    * makes tests linger.  Here is the exception you'll see:
2893    * <pre>
2894    * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/hlog.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683 failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2895    * </pre>
2896    * @param stream A DFSClient.DFSOutputStream.
2897    * @param max
2898    * @throws NoSuchFieldException
2899    * @throws SecurityException
2900    * @throws IllegalAccessException
2901    * @throws IllegalArgumentException
2902    */
2903   public static void setMaxRecoveryErrorCount(final OutputStream stream,
2904       final int max) {
2905     try {
2906       Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
2907       for (Class<?> clazz: clazzes) {
2908         String className = clazz.getSimpleName();
2909         if (className.equals("DFSOutputStream")) {
2910           if (clazz.isInstance(stream)) {
2911             Field maxRecoveryErrorCountField =
2912               stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2913             maxRecoveryErrorCountField.setAccessible(true);
2914             maxRecoveryErrorCountField.setInt(stream, max);
2915             break;
2916           }
2917         }
2918       }
2919     } catch (Exception e) {
2920       LOG.info("Could not set max recovery field", e);
2921     }
2922   }
2923 
2924   /**
2925    * Wait until all regions for a table in hbase:meta have a non-empty
2926    * info:server, up to 60 seconds. This means all regions have been deployed,
2927    * master has been informed and updated hbase:meta with the regions deployed
2928    * server.
2929    * @param tableName the table name
2930    * @throws IOException
2931    */
2932   public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
2933     waitUntilAllRegionsAssigned(tableName, 60000);
2934   }
2935 
2936   /**
2937    * Wait until all regions for a table in hbase:meta have a non-empty
2938    * info:server, or until timeout.  This means all regions have been deployed,
2939    * master has been informed and updated hbase:meta with the regions deployed
2940    * server.
2941    * @param tableName the table name
2942    * @param timeout timeout, in milliseconds
2943    * @throws IOException
2944    */
2945   public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
2946       throws IOException {
2947     final HTable meta = new HTable(getConfiguration(), TableName.META_TABLE_NAME);
2948     try {
2949       waitFor(timeout, 200, true, new Predicate<IOException>() {
2950         @Override
2951         public boolean evaluate() throws IOException {
2952           boolean allRegionsAssigned = true;
2953           Scan scan = new Scan();
2954           scan.addFamily(HConstants.CATALOG_FAMILY);
2955           ResultScanner s = meta.getScanner(scan);
2956           try {
2957             Result r;
2958             while ((r = s.next()) != null) {
2959               byte [] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
2960               HRegionInfo info = HRegionInfo.parseFromOrNull(b);
2961               if (info != null && info.getTable().equals(tableName)) {
2962                 b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
2963                 allRegionsAssigned &= (b != null);
2964               }
2965             }
2966           } finally {
2967             s.close();
2968           }
2969           return allRegionsAssigned;
2970         }
2971       });
2972     } finally {
2973       meta.close();
2974     }
2975     // So, all regions are in the meta table but make sure master knows of the assignments before
2976     // returing -- sometimes this can lag.
2977     HMaster master = getHBaseCluster().getMaster();
2978     final RegionStates states = master.getAssignmentManager().getRegionStates();
2979     waitFor(timeout, 200, new Predicate<IOException>() {
2980       @Override
2981       public boolean evaluate() throws IOException {
2982         List<HRegionInfo> hris = states.getRegionsOfTable(tableName);
2983         return hris != null && !hris.isEmpty();
2984       }
2985     });
2986   }
2987 
2988   /**
2989    * Do a small get/scan against one store. This is required because store
2990    * has no actual methods of querying itself, and relies on StoreScanner.
2991    */
2992   public static List<Cell> getFromStoreFile(HStore store,
2993                                                 Get get) throws IOException {
2994     Scan scan = new Scan(get);
2995     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
2996         scan.getFamilyMap().get(store.getFamily().getName()),
2997         // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set
2998         // readpoint 0.
2999         0);
3000 
3001     List<Cell> result = new ArrayList<Cell>();
3002     scanner.next(result);
3003     if (!result.isEmpty()) {
3004       // verify that we are on the row we want:
3005       Cell kv = result.get(0);
3006       if (!CellUtil.matchingRow(kv, get.getRow())) {
3007         result.clear();
3008       }
3009     }
3010     scanner.close();
3011     return result;
3012   }
3013 
3014   /**
3015    * Create region split keys between startkey and endKey
3016    *
3017    * @param startKey
3018    * @param endKey
3019    * @param numRegions the number of regions to be created. it has to be greater than 3.
3020    * @return
3021    */
3022   public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3023     assertTrue(numRegions>3);
3024     byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3025     byte [][] result = new byte[tmpSplitKeys.length+1][];
3026     System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3027     result[0] = HConstants.EMPTY_BYTE_ARRAY;
3028     return result;
3029   }
3030 
3031   /**
3032    * Do a small get/scan against one store. This is required because store
3033    * has no actual methods of querying itself, and relies on StoreScanner.
3034    */
3035   public static List<Cell> getFromStoreFile(HStore store,
3036                                                 byte [] row,
3037                                                 NavigableSet<byte[]> columns
3038                                                 ) throws IOException {
3039     Get get = new Get(row);
3040     Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3041     s.put(store.getFamily().getName(), columns);
3042 
3043     return getFromStoreFile(store,get);
3044   }
3045 
3046   /**
3047    * Gets a ZooKeeperWatcher.
3048    * @param TEST_UTIL
3049    */
3050   public static ZooKeeperWatcher getZooKeeperWatcher(
3051       HBaseTestingUtility TEST_UTIL) throws ZooKeeperConnectionException,
3052       IOException {
3053     ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
3054         "unittest", new Abortable() {
3055           boolean aborted = false;
3056 
3057           @Override
3058           public void abort(String why, Throwable e) {
3059             aborted = true;
3060             throw new RuntimeException("Fatal ZK error, why=" + why, e);
3061           }
3062 
3063           @Override
3064           public boolean isAborted() {
3065             return aborted;
3066           }
3067         });
3068     return zkw;
3069   }
3070 
3071   /**
3072    * Creates a znode with OPENED state.
3073    * @param TEST_UTIL
3074    * @param region
3075    * @param serverName
3076    * @return
3077    * @throws IOException
3078    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
3079    * @throws KeeperException
3080    * @throws NodeExistsException
3081    */
3082   public static ZooKeeperWatcher createAndForceNodeToOpenedState(
3083       HBaseTestingUtility TEST_UTIL, HRegion region,
3084       ServerName serverName) throws ZooKeeperConnectionException,
3085       IOException, KeeperException, NodeExistsException {
3086     ZooKeeperWatcher zkw = getZooKeeperWatcher(TEST_UTIL);
3087     ZKAssign.createNodeOffline(zkw, region.getRegionInfo(), serverName);
3088     int version = ZKAssign.transitionNodeOpening(zkw, region
3089         .getRegionInfo(), serverName);
3090     ZKAssign.transitionNodeOpened(zkw, region.getRegionInfo(), serverName,
3091         version);
3092     return zkw;
3093   }
3094 
3095   public static void assertKVListsEqual(String additionalMsg,
3096       final List<? extends Cell> expected,
3097       final List<? extends Cell> actual) {
3098     final int eLen = expected.size();
3099     final int aLen = actual.size();
3100     final int minLen = Math.min(eLen, aLen);
3101 
3102     int i;
3103     for (i = 0; i < minLen
3104         && KeyValue.COMPARATOR.compare(expected.get(i), actual.get(i)) == 0;
3105         ++i) {}
3106 
3107     if (additionalMsg == null) {
3108       additionalMsg = "";
3109     }
3110     if (!additionalMsg.isEmpty()) {
3111       additionalMsg = ". " + additionalMsg;
3112     }
3113 
3114     if (eLen != aLen || i != minLen) {
3115       throw new AssertionError(
3116           "Expected and actual KV arrays differ at position " + i + ": " +
3117           safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3118           safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3119     }
3120   }
3121 
3122   private static <T> String safeGetAsStr(List<T> lst, int i) {
3123     if (0 <= i && i < lst.size()) {
3124       return lst.get(i).toString();
3125     } else {
3126       return "<out_of_range>";
3127     }
3128   }
3129 
3130   public String getClusterKey() {
3131     return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3132         + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3133         + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3134             HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3135   }
3136 
3137   /** Creates a random table with the given parameters */
3138   public HTable createRandomTable(String tableName,
3139       final Collection<String> families,
3140       final int maxVersions,
3141       final int numColsPerRow,
3142       final int numFlushes,
3143       final int numRegions,
3144       final int numRowsPerFlush)
3145       throws IOException, InterruptedException {
3146 
3147     LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3148         " regions, " + numFlushes + " storefiles per region, " +
3149         numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3150         "\n");
3151 
3152     final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3153     final int numCF = families.size();
3154     final byte[][] cfBytes = new byte[numCF][];
3155     {
3156       int cfIndex = 0;
3157       for (String cf : families) {
3158         cfBytes[cfIndex++] = Bytes.toBytes(cf);
3159       }
3160     }
3161 
3162     final int actualStartKey = 0;
3163     final int actualEndKey = Integer.MAX_VALUE;
3164     final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3165     final int splitStartKey = actualStartKey + keysPerRegion;
3166     final int splitEndKey = actualEndKey - keysPerRegion;
3167     final String keyFormat = "%08x";
3168     final HTable table = createTable(tableName, cfBytes,
3169         maxVersions,
3170         Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3171         Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3172         numRegions);
3173 
3174     if (hbaseCluster != null) {
3175       getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3176     }
3177 
3178     for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3179       for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3180         final byte[] row = Bytes.toBytes(String.format(keyFormat,
3181             actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3182 
3183         Put put = new Put(row);
3184         Delete del = new Delete(row);
3185         for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3186           final byte[] cf = cfBytes[rand.nextInt(numCF)];
3187           final long ts = rand.nextInt();
3188           final byte[] qual = Bytes.toBytes("col" + iCol);
3189           if (rand.nextBoolean()) {
3190             final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3191                 "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3192                 ts + "_random_" + rand.nextLong());
3193             put.add(cf, qual, ts, value);
3194           } else if (rand.nextDouble() < 0.8) {
3195             del.deleteColumn(cf, qual, ts);
3196           } else {
3197             del.deleteColumns(cf, qual, ts);
3198           }
3199         }
3200 
3201         if (!put.isEmpty()) {
3202           table.put(put);
3203         }
3204 
3205         if (!del.isEmpty()) {
3206           table.delete(del);
3207         }
3208       }
3209       LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3210       table.flushCommits();
3211       if (hbaseCluster != null) {
3212         getMiniHBaseCluster().flushcache(table.getName());
3213       }
3214     }
3215 
3216     return table;
3217   }
3218 
3219   private static final int MIN_RANDOM_PORT = 0xc000;
3220   private static final int MAX_RANDOM_PORT = 0xfffe;
3221   private static Random random = new Random();
3222 
3223   /**
3224    * Returns a random port. These ports cannot be registered with IANA and are
3225    * intended for dynamic allocation (see http://bit.ly/dynports).
3226    */
3227   public static int randomPort() {
3228     return MIN_RANDOM_PORT
3229         + random.nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
3230   }
3231 
3232   /**
3233    * Returns a random free port and marks that port as taken. Not thread-safe. Expected to be
3234    * called from single-threaded test setup code/
3235    */
3236   public static int randomFreePort() {
3237     int port = 0;
3238     do {
3239       port = randomPort();
3240       if (takenRandomPorts.contains(port)) {
3241         continue;
3242       }
3243       takenRandomPorts.add(port);
3244 
3245       try {
3246         ServerSocket sock = new ServerSocket(port);
3247         sock.close();
3248       } catch (IOException ex) {
3249         port = 0;
3250       }
3251     } while (port == 0);
3252     return port;
3253   }
3254 
3255 
3256   public static String randomMultiCastAddress() {
3257     return "226.1.1." + random.nextInt(254);
3258   }
3259 
3260 
3261 
3262   public static void waitForHostPort(String host, int port)
3263       throws IOException {
3264     final int maxTimeMs = 10000;
3265     final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3266     IOException savedException = null;
3267     LOG.info("Waiting for server at " + host + ":" + port);
3268     for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3269       try {
3270         Socket sock = new Socket(InetAddress.getByName(host), port);
3271         sock.close();
3272         savedException = null;
3273         LOG.info("Server at " + host + ":" + port + " is available");
3274         break;
3275       } catch (UnknownHostException e) {
3276         throw new IOException("Failed to look up " + host, e);
3277       } catch (IOException e) {
3278         savedException = e;
3279       }
3280       Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3281     }
3282 
3283     if (savedException != null) {
3284       throw savedException;
3285     }
3286   }
3287 
3288   /**
3289    * Creates a pre-split table for load testing. If the table already exists,
3290    * logs a warning and continues.
3291    * @return the number of regions the table was split into
3292    */
3293   public static int createPreSplitLoadTestTable(Configuration conf,
3294       TableName tableName, byte[] columnFamily, Algorithm compression,
3295       DataBlockEncoding dataBlockEncoding) throws IOException {
3296     return createPreSplitLoadTestTable(conf, tableName,
3297       columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER,
3298       Durability.USE_DEFAULT);
3299   }
3300   /**
3301    * Creates a pre-split table for load testing. If the table already exists,
3302    * logs a warning and continues.
3303    * @return the number of regions the table was split into
3304    */
3305   public static int createPreSplitLoadTestTable(Configuration conf,
3306       TableName tableName, byte[] columnFamily, Algorithm compression,
3307       DataBlockEncoding dataBlockEncoding, int numRegionsPerServer,
3308       Durability durability)
3309           throws IOException {
3310     HTableDescriptor desc = new HTableDescriptor(tableName);
3311     desc.setDurability(durability);
3312     HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
3313     hcd.setDataBlockEncoding(dataBlockEncoding);
3314     hcd.setCompressionType(compression);
3315     return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer);
3316   }
3317 
3318   /**
3319    * Creates a pre-split table for load testing. If the table already exists,
3320    * logs a warning and continues.
3321    * @return the number of regions the table was split into
3322    */
3323   public static int createPreSplitLoadTestTable(Configuration conf,
3324       HTableDescriptor desc, HColumnDescriptor hcd) throws IOException {
3325     return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3326   }
3327 
3328   /**
3329    * Creates a pre-split table for load testing. If the table already exists,
3330    * logs a warning and continues.
3331    * @return the number of regions the table was split into
3332    */
3333   public static int createPreSplitLoadTestTable(Configuration conf,
3334       HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException {
3335     if (!desc.hasFamily(hcd.getName())) {
3336       desc.addFamily(hcd);
3337     }
3338 
3339     int totalNumberOfRegions = 0;
3340     HBaseAdmin admin = new HBaseAdmin(conf);
3341     try {
3342       // create a table a pre-splits regions.
3343       // The number of splits is set as:
3344       //    region servers * regions per region server).
3345       int numberOfServers = admin.getClusterStatus().getServers().size();
3346       if (numberOfServers == 0) {
3347         throw new IllegalStateException("No live regionservers");
3348       }
3349 
3350       totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3351       LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3352           "pre-splitting table into " + totalNumberOfRegions + " regions " +
3353           "(regions per server: " + numRegionsPerServer + ")");
3354 
3355       byte[][] splits = new RegionSplitter.HexStringSplit().split(
3356           totalNumberOfRegions);
3357 
3358       admin.createTable(desc, splits);
3359     } catch (MasterNotRunningException e) {
3360       LOG.error("Master not running", e);
3361       throw new IOException(e);
3362     } catch (TableExistsException e) {
3363       LOG.warn("Table " + desc.getTableName() +
3364           " already exists, continuing");
3365     } finally {
3366       admin.close();
3367     }
3368     return totalNumberOfRegions;
3369   }
3370 
3371   public static int getMetaRSPort(Configuration conf) throws IOException {
3372     HTable table = new HTable(conf, TableName.META_TABLE_NAME);
3373     HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(""));
3374     table.close();
3375     return hloc.getPort();
3376   }
3377 
3378   /**
3379    *  Due to async racing issue, a region may not be in
3380    *  the online region list of a region server yet, after
3381    *  the assignment znode is deleted and the new assignment
3382    *  is recorded in master.
3383    */
3384   public void assertRegionOnServer(
3385       final HRegionInfo hri, final ServerName server,
3386       final long timeout) throws IOException, InterruptedException {
3387     long timeoutTime = System.currentTimeMillis() + timeout;
3388     while (true) {
3389       List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
3390       if (regions.contains(hri)) return;
3391       long now = System.currentTimeMillis();
3392       if (now > timeoutTime) break;
3393       Thread.sleep(10);
3394     }
3395     fail("Could not find region " + hri.getRegionNameAsString()
3396       + " on server " + server);
3397   }
3398 
3399   /**
3400    * Check to make sure the region is open on the specified
3401    * region server, but not on any other one.
3402    */
3403   public void assertRegionOnlyOnServer(
3404       final HRegionInfo hri, final ServerName server,
3405       final long timeout) throws IOException, InterruptedException {
3406     long timeoutTime = System.currentTimeMillis() + timeout;
3407     while (true) {
3408       List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
3409       if (regions.contains(hri)) {
3410         List<JVMClusterUtil.RegionServerThread> rsThreads =
3411           getHBaseCluster().getLiveRegionServerThreads();
3412         for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
3413           HRegionServer rs = rsThread.getRegionServer();
3414           if (server.equals(rs.getServerName())) {
3415             continue;
3416           }
3417           Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3418           for (HRegion r: hrs) {
3419             assertTrue("Region should not be double assigned",
3420               r.getRegionId() != hri.getRegionId());
3421           }
3422         }
3423         return; // good, we are happy
3424       }
3425       long now = System.currentTimeMillis();
3426       if (now > timeoutTime) break;
3427       Thread.sleep(10);
3428     }
3429     fail("Could not find region " + hri.getRegionNameAsString()
3430       + " on server " + server);
3431   }
3432 
3433   public HRegion createTestRegion(String tableName, HColumnDescriptor hcd)
3434       throws IOException {
3435     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
3436     htd.addFamily(hcd);
3437     HRegionInfo info =
3438         new HRegionInfo(TableName.valueOf(tableName), null, null, false);
3439     HRegion region =
3440         HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), htd);
3441     return region;
3442   }
3443 
3444   public void setFileSystemURI(String fsURI) {
3445     FS_URI = fsURI;
3446   }
3447 
3448   /**
3449    * Wrapper method for {@link Waiter#waitFor(Configuration, long, Predicate)}.
3450    */
3451   public <E extends Exception> long waitFor(long timeout, Predicate<E> predicate)
3452       throws E {
3453     return Waiter.waitFor(this.conf, timeout, predicate);
3454   }
3455 
3456   /**
3457    * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, Predicate)}.
3458    */
3459   public <E extends Exception> long waitFor(long timeout, long interval, Predicate<E> predicate)
3460       throws E {
3461     return Waiter.waitFor(this.conf, timeout, interval, predicate);
3462   }
3463 
3464   /**
3465    * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)}.
3466    */
3467   public <E extends Exception> long waitFor(long timeout, long interval,
3468       boolean failIfTimeout, Predicate<E> predicate) throws E {
3469     return Waiter.waitFor(this.conf, timeout, interval, failIfTimeout, predicate);
3470   }
3471   
3472   /**
3473    * Wait until no regions in transition.
3474    * @param timeout How long to wait.
3475    * @throws Exception
3476    */
3477   public void waitUntilNoRegionsInTransition(
3478       final long timeout) throws Exception {
3479     waitFor(timeout, predicateNoRegionsInTransition());
3480   }
3481 
3482   /**
3483    * Returns a {@link Predicate} for checking that there are no regions in transition in master
3484    */
3485   public Waiter.Predicate<Exception> predicateNoRegionsInTransition() {
3486     return new Waiter.Predicate<Exception>() {
3487       @Override
3488       public boolean evaluate() throws Exception {
3489         final RegionStates regionStates = getMiniHBaseCluster().getMaster()
3490             .getAssignmentManager().getRegionStates();
3491         return !regionStates.isRegionsInTransition();
3492       }
3493     };
3494   }
3495 
3496   /**
3497    * Returns a {@link Predicate} for checking that table is enabled
3498    */
3499   public Waiter.Predicate<Exception> predicateTableEnabled(final TableName tableName) {
3500     return new Waiter.Predicate<Exception>() {
3501      @Override
3502      public boolean evaluate() throws Exception {
3503        return getHBaseAdmin().isTableEnabled(tableName);
3504       }
3505     };
3506   }
3507 
3508   /**
3509    * Create a set of column descriptors with the combination of compression,
3510    * encoding, bloom codecs available.
3511    * @return the list of column descriptors
3512    */
3513   public static List<HColumnDescriptor> generateColumnDescriptors() {
3514     return generateColumnDescriptors("");
3515   }
3516 
3517   /**
3518    * Create a set of column descriptors with the combination of compression,
3519    * encoding, bloom codecs available.
3520    * @param prefix family names prefix
3521    * @return the list of column descriptors
3522    */
3523   public static List<HColumnDescriptor> generateColumnDescriptors(final String prefix) {
3524     List<HColumnDescriptor> htds = new ArrayList<HColumnDescriptor>();
3525     long familyId = 0;
3526     for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
3527       for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
3528         for (BloomType bloomType: BloomType.values()) {
3529           String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3530           HColumnDescriptor htd = new HColumnDescriptor(name);
3531           htd.setCompressionType(compressionType);
3532           htd.setDataBlockEncoding(encodingType);
3533           htd.setBloomFilterType(bloomType);
3534           htds.add(htd);
3535           familyId++;
3536         }
3537       }
3538     }
3539     return htds;
3540   }
3541 
3542   /**
3543    * Get supported compression algorithms.
3544    * @return supported compression algorithms.
3545    */
3546   public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3547     String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3548     List<Compression.Algorithm> supportedAlgos = new ArrayList<Compression.Algorithm>();
3549     for (String algoName : allAlgos) {
3550       try {
3551         Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3552         algo.getCompressor();
3553         supportedAlgos.add(algo);
3554       } catch (Throwable t) {
3555         // this algo is not available
3556       }
3557     }
3558     return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3559   }
3560 
3561 }