View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY;
22  
23  import java.io.IOException;
24  import java.io.PrintStream;
25  import java.lang.reflect.Constructor;
26  import java.math.BigDecimal;
27  import java.math.MathContext;
28  import java.text.DecimalFormat;
29  import java.text.SimpleDateFormat;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Date;
33  import java.util.Map;
34  import java.util.Random;
35  import java.util.TreeMap;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.Future;
41  
42  import com.google.common.util.concurrent.ThreadFactoryBuilder;
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.conf.Configured;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.client.Durability;
51  import org.apache.hadoop.hbase.client.Get;
52  import org.apache.hadoop.hbase.client.HBaseAdmin;
53  import org.apache.hadoop.hbase.client.HConnection;
54  import org.apache.hadoop.hbase.client.HConnectionManager;
55  import org.apache.hadoop.hbase.client.HTableInterface;
56  import org.apache.hadoop.hbase.client.Put;
57  import org.apache.hadoop.hbase.client.Result;
58  import org.apache.hadoop.hbase.client.ResultScanner;
59  import org.apache.hadoop.hbase.client.Scan;
60  import org.apache.hadoop.hbase.filter.BinaryComparator;
61  import org.apache.hadoop.hbase.filter.CompareFilter;
62  import org.apache.hadoop.hbase.filter.Filter;
63  import org.apache.hadoop.hbase.filter.FilterAllFilter;
64  import org.apache.hadoop.hbase.filter.FilterList;
65  import org.apache.hadoop.hbase.filter.PageFilter;
66  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
67  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
68  import org.apache.hadoop.hbase.io.compress.Compression;
69  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
70  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
71  import org.apache.hadoop.hbase.regionserver.BloomType;
72  import org.apache.hadoop.hbase.util.Bytes;
73  import org.apache.hadoop.hbase.util.Hash;
74  import org.apache.hadoop.hbase.util.MurmurHash;
75  import org.apache.hadoop.hbase.util.Pair;
76  import org.apache.hadoop.io.LongWritable;
77  import org.apache.hadoop.io.Text;
78  import org.apache.hadoop.mapreduce.Job;
79  import org.apache.hadoop.mapreduce.Mapper;
80  import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
81  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
82  import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
83  import org.apache.hadoop.util.Tool;
84  import org.apache.hadoop.util.ToolRunner;
85  import org.codehaus.jackson.map.ObjectMapper;
86  
87  import com.google.common.util.concurrent.ThreadFactoryBuilder;
88  import com.yammer.metrics.core.Histogram;
89  import com.yammer.metrics.core.MetricsRegistry;
90  
91  /**
92   * Script used evaluating HBase performance and scalability.  Runs a HBase
93   * client that steps through one of a set of hardcoded tests or 'experiments'
94   * (e.g. a random reads test, a random writes test, etc.). Pass on the
95   * command-line which test to run and how many clients are participating in
96   * this experiment. Run <code>java PerformanceEvaluation --help</code> to
97   * obtain usage.
98   *
99   * <p>This class sets up and runs the evaluation programs described in
100  * Section 7, <i>Performance Evaluation</i>, of the <a
101  * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
102  * paper, pages 8-10.
103  *
104  * <p>If number of clients > 1, we start up a MapReduce job. Each map task
105  * runs an individual client. Each client does about 1GB of data.
106  */
107 public class PerformanceEvaluation extends Configured implements Tool {
108   protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
109 
110   public static final String TABLE_NAME = "TestTable";
111   public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
112   public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
113   public static final int VALUE_LENGTH = 1000;
114   public static final int ROW_LENGTH = 26;
115 
116   private static final int ONE_GB = 1024 * 1024 * 1000;
117   private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH;
118   // TODO : should we make this configurable
119   private static final int TAG_LENGTH = 256;
120   private static final DecimalFormat FMT = new DecimalFormat("0.##");
121   private static final MathContext CXT = MathContext.DECIMAL64;
122   private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
123   private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
124   private static final TestOptions DEFAULT_OPTS = new TestOptions();
125 
126   protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
127 
128   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
129 
130   /**
131    * Enum for map metrics.  Keep it out here rather than inside in the Map
132    * inner-class so we can find associated properties.
133    */
134   protected static enum Counter {
135     /** elapsed time */
136     ELAPSED_TIME,
137     /** number of rows */
138     ROWS
139   }
140 
141   /**
142    * Constructor
143    * @param conf Configuration object
144    */
145   public PerformanceEvaluation(final Configuration conf) {
146     super(conf);
147 
148     addCommandDescriptor(RandomReadTest.class, "randomRead",
149         "Run random read test");
150     addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
151         "Run random seek and scan 100 test");
152     addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
153         "Run random seek scan with both start and stop row (max 10 rows)");
154     addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
155         "Run random seek scan with both start and stop row (max 100 rows)");
156     addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
157         "Run random seek scan with both start and stop row (max 1000 rows)");
158     addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
159         "Run random seek scan with both start and stop row (max 10000 rows)");
160     addCommandDescriptor(RandomWriteTest.class, "randomWrite",
161         "Run random write test");
162     addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
163         "Run sequential read test");
164     addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
165         "Run sequential write test");
166     addCommandDescriptor(ScanTest.class, "scan",
167         "Run scan test (read every row)");
168     addCommandDescriptor(FilteredScanTest.class, "filterScan",
169         "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
170   }
171 
172   protected void addCommandDescriptor(Class<? extends Test> cmdClass,
173       String name, String description) {
174     CmdDescriptor cmdDescriptor =
175       new CmdDescriptor(cmdClass, name, description);
176     commands.put(name, cmdDescriptor);
177   }
178 
179   /**
180    * Implementations can have their status set.
181    */
182   interface Status {
183     /**
184      * Sets status
185      * @param msg status message
186      * @throws IOException
187      */
188     void setStatus(final String msg) throws IOException;
189   }
190 
191   /**
192    * MapReduce job that runs a performance evaluation client in each map task.
193    */
194   public static class EvaluationMapTask
195       extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
196 
197     /** configuration parameter name that contains the command */
198     public final static String CMD_KEY = "EvaluationMapTask.command";
199     /** configuration parameter name that contains the PE impl */
200     public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
201 
202     private Class<? extends Test> cmd;
203     private PerformanceEvaluation pe;
204 
205     @Override
206     protected void setup(Context context) throws IOException, InterruptedException {
207       this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
208 
209       // this is required so that extensions of PE are instantiated within the
210       // map reduce task...
211       Class<? extends PerformanceEvaluation> peClass =
212           forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
213       try {
214         this.pe = peClass.getConstructor(Configuration.class)
215             .newInstance(context.getConfiguration());
216       } catch (Exception e) {
217         throw new IllegalStateException("Could not instantiate PE instance", e);
218       }
219     }
220 
221     private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
222       try {
223         return Class.forName(className).asSubclass(type);
224       } catch (ClassNotFoundException e) {
225         throw new IllegalStateException("Could not find class for name: " + className, e);
226       }
227     }
228 
229     protected void map(LongWritable key, Text value, final Context context)
230            throws IOException, InterruptedException {
231 
232       Status status = new Status() {
233         public void setStatus(String msg) {
234            context.setStatus(msg);
235         }
236       };
237 
238       ObjectMapper mapper = new ObjectMapper();
239       TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
240       Configuration conf = HBaseConfiguration.create(context.getConfiguration());
241 
242       // Evaluation task
243       long elapsedTime = this.pe.runOneClient(this.cmd, conf, opts, status);
244       // Collect how much time the thing took. Report as map output and
245       // to the ELAPSED_TIME counter.
246       context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
247       context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
248       context.write(new LongWritable(opts.startRow), new LongWritable(elapsedTime));
249       context.progress();
250     }
251   }
252 
253   /*
254    * If table does not already exist, create.
255    * @param c Client to use checking.
256    * @return True if we created the table.
257    * @throws IOException
258    */
259   private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
260     HTableDescriptor tableDescriptor = getTableDescriptor(opts);
261     if (opts.presplitRegions > 0) {
262       // presplit requested
263       if (admin.tableExists(tableDescriptor.getTableName())) {
264         admin.disableTable(tableDescriptor.getTableName());
265         admin.deleteTable(tableDescriptor.getTableName());
266       }
267 
268       byte[][] splits = getSplits(opts);
269       for (int i=0; i < splits.length; i++) {
270         LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
271       }
272       admin.createTable(tableDescriptor, splits);
273       LOG.info ("Table created with " + opts.presplitRegions + " splits");
274     }
275     else {
276       boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
277       if (!tableExists) {
278         admin.createTable(tableDescriptor);
279         LOG.info("Table " + tableDescriptor + " created");
280       }
281     }
282     return admin.tableExists(tableDescriptor.getTableName());
283   }
284 
285   /**
286    * Create an HTableDescriptor from provided TestOptions.
287    */
288   protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
289     HTableDescriptor desc = new HTableDescriptor(opts.tableName);
290     HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
291     family.setDataBlockEncoding(opts.blockEncoding);
292     family.setCompressionType(opts.compression);
293     family.setBloomFilterType(opts.bloomType);
294     if (opts.inMemoryCF) {
295       family.setInMemory(true);
296     }
297     desc.addFamily(family);
298     return desc;
299   }
300 
301   /**
302    * generates splits based on total number of rows and specified split regions
303    */
304   protected static byte[][] getSplits(TestOptions opts) {
305     if (opts.presplitRegions == 0)
306       return new byte [0][];
307 
308     int numSplitPoints = opts.presplitRegions - 1;
309     byte[][] splits = new byte[numSplitPoints][];
310     int jump = opts.totalRows / opts.presplitRegions;
311     for (int i = 0; i < numSplitPoints; i++) {
312       int rowkey = jump * (1 + i);
313       splits[i] = format(rowkey);
314     }
315     return splits;
316   }
317 
318   /*
319    * Run all clients in this vm each to its own thread.
320    * @param cmd Command to run.
321    * @throws IOException
322    */
323   private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
324       throws IOException, InterruptedException {
325     Future<Long>[] threads = new Future[opts.numClientThreads];
326     long[] timings = new long[opts.numClientThreads];
327     ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
328       new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
329     for (int i = 0; i < threads.length; i++) {
330       final int index = i;
331       threads[i] = pool.submit(new Callable<Long>() {
332         @Override
333         public Long call() throws Exception {
334           TestOptions threadOpts = new TestOptions(opts);
335           threadOpts.startRow = index * threadOpts.perClientRunRows;
336           long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
337             public void setStatus(final String msg) throws IOException {
338               LOG.info("client-" + Thread.currentThread().getName() + " " + msg);
339             }
340           });
341           LOG.info("Finished " + Thread.currentThread().getName() + " in " + elapsedTime +
342             "ms over " + threadOpts.perClientRunRows + " rows");
343           return elapsedTime;
344         }
345       });
346     }
347     pool.shutdown();
348     for (int i = 0; i < threads.length; i++) {
349       try {
350         timings[i] = threads[i].get();
351       } catch (ExecutionException e) {
352         throw new IOException(e.getCause());
353       }
354     }
355     final String test = cmd.getSimpleName();
356     LOG.info("[" + test + "] Summary of timings (ms): "
357              + Arrays.toString(timings));
358     Arrays.sort(timings);
359     long total = 0;
360     for (int i = 0; i < timings.length; i++) {
361       total += timings[i];
362     }
363     LOG.info("[" + test + "]"
364              + "\tMin: " + timings[0] + "ms"
365              + "\tMax: " + timings[timings.length - 1] + "ms"
366              + "\tAvg: " + (total / timings.length) + "ms");
367   }
368 
369   /*
370    * Run a mapreduce job.  Run as many maps as asked-for clients.
371    * Before we start up the job, write out an input file with instruction
372    * per client regards which row they are to start on.
373    * @param cmd Command to run.
374    * @throws IOException
375    */
376   private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
377         InterruptedException, ClassNotFoundException {
378     Configuration conf = getConf();
379     Path inputDir = writeInputFile(conf, opts);
380     conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
381     conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
382     Job job = new Job(conf);
383     job.setJarByClass(PerformanceEvaluation.class);
384     job.setJobName("HBase Performance Evaluation");
385 
386     job.setInputFormatClass(NLineInputFormat.class);
387     NLineInputFormat.setInputPaths(job, inputDir);
388     // this is default, but be explicit about it just in case.
389     NLineInputFormat.setNumLinesPerSplit(job, 1);
390 
391     job.setOutputKeyClass(LongWritable.class);
392     job.setOutputValueClass(LongWritable.class);
393 
394     job.setMapperClass(EvaluationMapTask.class);
395     job.setReducerClass(LongSumReducer.class);
396 
397     job.setNumReduceTasks(1);
398 
399     job.setOutputFormatClass(TextOutputFormat.class);
400     TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
401 
402     TableMapReduceUtil.addDependencyJars(job);
403     TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
404       DescriptiveStatistics.class, // commons-math
405       ObjectMapper.class);         // jackson-mapper-asl
406 
407     TableMapReduceUtil.initCredentials(job);
408 
409     job.waitForCompletion(true);
410   }
411 
412   /*
413    * Write input file of offsets-per-client for the mapreduce job.
414    * @param c Configuration
415    * @return Directory that contains file written.
416    * @throws IOException
417    */
418   private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
419     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
420     Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
421     Path inputDir = new Path(jobdir, "inputs");
422 
423     FileSystem fs = FileSystem.get(c);
424     fs.mkdirs(inputDir);
425 
426     Path inputFile = new Path(inputDir, "input.txt");
427     PrintStream out = new PrintStream(fs.create(inputFile));
428     // Make input random.
429     Map<Integer, String> m = new TreeMap<Integer, String>();
430     Hash h = MurmurHash.getInstance();
431     int perClientRows = (opts.totalRows / opts.numClientThreads);
432     ObjectMapper mapper = new ObjectMapper();
433     mapper.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
434     try {
435       for (int i = 0; i < 10; i++) {
436         for (int j = 0; j < opts.numClientThreads; j++) {
437           TestOptions next = new TestOptions(opts);
438           next.startRow = (j * perClientRows) + (i * (perClientRows/10));
439           next.perClientRunRows = perClientRows / 10;
440           String s = mapper.writeValueAsString(next);
441           int hash = h.hash(Bytes.toBytes(s));
442           m.put(hash, s);
443         }
444       }
445       for (Map.Entry<Integer, String> e: m.entrySet()) {
446         out.println(e.getValue());
447       }
448     } finally {
449       out.close();
450     }
451     return inputDir;
452   }
453 
454   /**
455    * Describes a command.
456    */
457   static class CmdDescriptor {
458     private Class<? extends Test> cmdClass;
459     private String name;
460     private String description;
461 
462     CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
463       this.cmdClass = cmdClass;
464       this.name = name;
465       this.description = description;
466     }
467 
468     public Class<? extends Test> getCmdClass() {
469       return cmdClass;
470     }
471 
472     public String getName() {
473       return name;
474     }
475 
476     public String getDescription() {
477       return description;
478     }
479   }
480 
481   /**
482    * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
483    * This makes tracking all these arguments a little easier.
484    */
485   static class TestOptions {
486 
487     public TestOptions() {}
488 
489     public TestOptions(TestOptions that) {
490       this.nomapred = that.nomapred;
491       this.startRow = that.startRow;
492       this.perClientRunRows = that.perClientRunRows;
493       this.numClientThreads = that.numClientThreads;
494       this.totalRows = that.totalRows;
495       this.sampleRate = that.sampleRate;
496       this.tableName = that.tableName;
497       this.flushCommits = that.flushCommits;
498       this.writeToWAL = that.writeToWAL;
499       this.useTags = that.useTags;
500       this.noOfTags = that.noOfTags;
501       this.reportLatency = that.reportLatency;
502       this.multiGet = that.multiGet;
503       this.inMemoryCF = that.inMemoryCF;
504       this.presplitRegions = that.presplitRegions;
505       this.compression = that.compression;
506       this.blockEncoding = that.blockEncoding;
507       this.filterAll = that.filterAll;
508       this.bloomType = that.bloomType;
509       this.addColumns = that.addColumns;
510     }
511 
512     public boolean nomapred = false;
513     public boolean filterAll = false;
514     public int startRow = 0;
515     public int perClientRunRows = ROWS_PER_GB;
516     public int numClientThreads = 1;
517     public int totalRows = ROWS_PER_GB;
518     public float sampleRate = 1.0f;
519     public String tableName = TABLE_NAME;
520     public boolean flushCommits = true;
521     public boolean writeToWAL = true;
522     public boolean useTags = false;
523     public int noOfTags = 1;
524     public boolean reportLatency = false;
525     public int multiGet = 0;
526     boolean inMemoryCF = false;
527     int presplitRegions = 0;
528     public Compression.Algorithm compression = Compression.Algorithm.NONE;
529     public BloomType bloomType = BloomType.ROW;
530     public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
531     boolean addColumns = true;
532   }
533 
534   /*
535    * A test.
536    * Subclass to particularize what happens per row.
537    */
538   static abstract class Test {
539     // Below is make it so when Tests are all running in the one
540     // jvm, that they each have a differently seeded Random.
541     private static final Random randomSeed = new Random(System.currentTimeMillis());
542     private static long nextRandomSeed() {
543       return randomSeed.nextLong();
544     }
545     protected final Random rand = new Random(nextRandomSeed());
546     protected final Configuration conf;
547     protected final TestOptions opts;
548 
549     private final Status status;
550     protected HConnection connection;
551     protected HTableInterface table;
552 
553     /**
554      * Note that all subclasses of this class must provide a public contructor
555      * that has the exact same list of arguments.
556      */
557     Test(final Configuration conf, final TestOptions options, final Status status) {
558       this.conf = conf;
559       this.opts = options;
560       this.status = status;
561     }
562 
563     private String generateStatus(final int sr, final int i, final int lr) {
564       return sr + "/" + i + "/" + lr;
565     }
566 
567     protected int getReportingPeriod() {
568       int period = opts.perClientRunRows / 10;
569       return period == 0 ? opts.perClientRunRows : period;
570     }
571 
572     void testSetup() throws IOException {
573       this.connection = HConnectionManager.createConnection(conf);
574       this.table = connection.getTable(opts.tableName);
575       this.table.setAutoFlush(false, true);
576     }
577 
578     void testTakedown() throws IOException {
579       if (opts.flushCommits) {
580         this.table.flushCommits();
581       }
582       table.close();
583       connection.close();
584     }
585 
586     /*
587      * Run test
588      * @return Elapsed time.
589      * @throws IOException
590      */
591     long test() throws IOException {
592       testSetup();
593       LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
594       final long startTime = System.nanoTime();
595       try {
596         testTimed();
597       } finally {
598         testTakedown();
599       }
600       return (System.nanoTime() - startTime) / 1000000;
601     }
602 
603     /**
604      * Provides an extension point for tests that don't want a per row invocation.
605      */
606     void testTimed() throws IOException {
607       int lastRow = opts.startRow + opts.perClientRunRows;
608       // Report on completion of 1/10th of total.
609       for (int i = opts.startRow; i < lastRow; i++) {
610         testRow(i);
611         if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
612           status.setStatus(generateStatus(opts.startRow, i, lastRow));
613         }
614       }
615     }
616 
617     /*
618     * Test for individual row.
619     * @param i Row index.
620     */
621     abstract void testRow(final int i) throws IOException;
622   }
623 
624 
625   @SuppressWarnings("unused")
626   static class RandomSeekScanTest extends Test {
627     RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
628       super(conf, options, status);
629     }
630 
631     @Override
632     void testRow(final int i) throws IOException {
633       Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
634       FilterList list = new FilterList();
635       if (opts.addColumns) {
636         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
637       }
638       if (opts.filterAll) {
639         list.addFilter(new FilterAllFilter());
640       }
641       list.addFilter(new WhileMatchFilter(new PageFilter(120)));
642       scan.setFilter(list);
643       ResultScanner s = this.table.getScanner(scan);
644       for (Result rr; (rr = s.next()) != null;) ;
645       s.close();
646     }
647 
648     @Override
649     protected int getReportingPeriod() {
650       int period = opts.perClientRunRows / 100;
651       return period == 0 ? opts.perClientRunRows : period;
652     }
653 
654   }
655 
656   @SuppressWarnings("unused")
657   static abstract class RandomScanWithRangeTest extends Test {
658     RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
659       super(conf, options, status);
660     }
661 
662     @Override
663     void testRow(final int i) throws IOException {
664       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
665       Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
666       if (opts.filterAll) {
667         scan.setFilter(new FilterAllFilter());
668       }
669       if (opts.addColumns) {
670         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
671       }
672       ResultScanner s = this.table.getScanner(scan);
673       int count = 0;
674       for (Result rr; (rr = s.next()) != null;) {
675         count++;
676       }
677 
678       if (i % 100 == 0) {
679         LOG.info(String.format("Scan for key range %s - %s returned %s rows",
680             Bytes.toString(startAndStopRow.getFirst()),
681             Bytes.toString(startAndStopRow.getSecond()), count));
682       }
683 
684       s.close();
685     }
686 
687     protected abstract Pair<byte[],byte[]> getStartAndStopRow();
688 
689     protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
690       int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
691       int stop = start + maxRange;
692       return new Pair<byte[],byte[]>(format(start), format(stop));
693     }
694 
695     @Override
696     protected int getReportingPeriod() {
697       int period = opts.perClientRunRows / 100;
698       return period == 0? opts.perClientRunRows: period;
699     }
700   }
701 
702   static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
703     RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
704       super(conf, options, status);
705     }
706 
707     @Override
708     protected Pair<byte[], byte[]> getStartAndStopRow() {
709       return generateStartAndStopRows(10);
710     }
711   }
712 
713   static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
714     RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
715       super(conf, options, status);
716     }
717 
718     @Override
719     protected Pair<byte[], byte[]> getStartAndStopRow() {
720       return generateStartAndStopRows(100);
721     }
722   }
723 
724   static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
725     RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
726       super(conf, options, status);
727     }
728 
729     @Override
730     protected Pair<byte[], byte[]> getStartAndStopRow() {
731       return generateStartAndStopRows(1000);
732     }
733   }
734 
735   static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
736     RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
737       super(conf, options, status);
738     }
739 
740     @Override
741     protected Pair<byte[], byte[]> getStartAndStopRow() {
742       return generateStartAndStopRows(10000);
743     }
744   }
745 
746   static class RandomReadTest extends Test {
747     private final int everyN;
748     private final double[] times;
749     private ArrayList<Get> gets;
750     int idx = 0;
751 
752     RandomReadTest(Configuration conf, TestOptions options, Status status) {
753       super(conf, options, status);
754       everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
755       LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
756       if (opts.multiGet > 0) {
757         LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
758         this.gets = new ArrayList<Get>(opts.multiGet);
759       }
760       if (opts.reportLatency) {
761         this.times = new double[(int) Math.ceil(opts.perClientRunRows * opts.sampleRate / Math.max(1, opts.multiGet))];
762       } else {
763         this.times = null;
764       }
765     }
766 
767     @Override
768     void testRow(final int i) throws IOException {
769       if (i % everyN == 0) {
770         Get get = new Get(getRandomRow(this.rand, opts.totalRows));
771         if (opts.addColumns) {
772           get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
773         }
774         if (opts.filterAll) {
775           get.setFilter(new FilterAllFilter());
776         }
777         if (opts.multiGet > 0) {
778           this.gets.add(get);
779           if (this.gets.size() == opts.multiGet) {
780             long start = System.nanoTime();
781             this.table.get(this.gets);
782             if (opts.reportLatency) {
783               times[idx++] = (System.nanoTime() - start) / 1e6;
784             }
785             this.gets.clear();
786           }
787         } else {
788           long start = System.nanoTime();
789           this.table.get(get);
790           if (opts.reportLatency) {
791             times[idx++] = (System.nanoTime() - start) / 1e6;
792           }
793         }
794       }
795     }
796 
797     @Override
798     protected int getReportingPeriod() {
799       int period = opts.perClientRunRows / 100;
800       return period == 0 ? opts.perClientRunRows : period;
801     }
802 
803     @Override
804     protected void testTakedown() throws IOException {
805       if (this.gets != null && this.gets.size() > 0) {
806         this.table.get(gets);
807         this.gets.clear();
808       }
809       super.testTakedown();
810       if (opts.reportLatency) {
811         Arrays.sort(times);
812         DescriptiveStatistics ds = new DescriptiveStatistics();
813         for (double t : times) {
814           ds.addValue(t);
815         }
816         LOG.info("randomRead latency log (ms), on " + times.length + " measures");
817         LOG.info("99.9999% = " + ds.getPercentile(99.9999d));
818         LOG.info(" 99.999% = " + ds.getPercentile(99.999d));
819         LOG.info("  99.99% = " + ds.getPercentile(99.99d));
820         LOG.info("   99.9% = " + ds.getPercentile(99.9d));
821         LOG.info("     99% = " + ds.getPercentile(99d));
822         LOG.info("     95% = " + ds.getPercentile(95d));
823         LOG.info("     90% = " + ds.getPercentile(90d));
824         LOG.info("     80% = " + ds.getPercentile(80d));
825         LOG.info("Standard Deviation = " + ds.getStandardDeviation());
826         LOG.info("Mean = " + ds.getMean());
827       }
828     }
829   }
830 
831   static class RandomWriteTest extends Test {
832     RandomWriteTest(Configuration conf, TestOptions options, Status status) {
833       super(conf, options, status);
834     }
835 
836     @Override
837     void testRow(final int i) throws IOException {
838       byte[] row = getRandomRow(this.rand, opts.totalRows);
839       Put put = new Put(row);
840       byte[] value = generateData(this.rand, VALUE_LENGTH);
841       if (opts.useTags) {
842         byte[] tag = generateData(this.rand, TAG_LENGTH);
843         Tag[] tags = new Tag[opts.noOfTags];
844         for (int n = 0; n < opts.noOfTags; n++) {
845           Tag t = new Tag((byte) n, tag);
846           tags[n] = t;
847         }
848         KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
849             value, tags);
850         put.add(kv);
851       } else {
852         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
853       }
854       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
855       table.put(put);
856     }
857   }
858 
859 
860   static class ScanTest extends Test {
861     private ResultScanner testScanner;
862 
863     ScanTest(Configuration conf, TestOptions options, Status status) {
864       super(conf, options, status);
865     }
866 
867     @Override
868     void testTakedown() throws IOException {
869       if (this.testScanner != null) {
870         this.testScanner.close();
871       }
872       super.testTakedown();
873     }
874 
875 
876     @Override
877     void testRow(final int i) throws IOException {
878       if (this.testScanner == null) {
879         Scan scan = new Scan(format(opts.startRow));
880         scan.setCaching(30);
881         if (opts.addColumns) {
882           scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
883         }
884         if (opts.filterAll) {
885           scan.setFilter(new FilterAllFilter());
886         }
887        this.testScanner = table.getScanner(scan);
888       }
889       testScanner.next();
890     }
891 
892   }
893 
894   static class SequentialReadTest extends Test {
895     SequentialReadTest(Configuration conf, TestOptions options, Status status) {
896       super(conf, options, status);
897     }
898 
899     @Override
900     void testRow(final int i) throws IOException {
901       Get get = new Get(format(i));
902       if (opts.addColumns) {
903         get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
904       }
905       if (opts.filterAll) {
906         get.setFilter(new FilterAllFilter());
907       }
908       table.get(get);
909     }
910   }
911 
912   static class SequentialWriteTest extends Test {
913     SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
914       super(conf, options, status);
915     }
916 
917     @Override
918     void testRow(final int i) throws IOException {
919       byte[] row = format(i);
920       Put put = new Put(row);
921       byte[] value = generateData(this.rand, VALUE_LENGTH);
922       if (opts.useTags) {
923         byte[] tag = generateData(this.rand, TAG_LENGTH);
924         Tag[] tags = new Tag[opts.noOfTags];
925         for (int n = 0; n < opts.noOfTags; n++) {
926           Tag t = new Tag((byte) n, tag);
927           tags[n] = t;
928         }
929         KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
930             value, tags);
931         put.add(kv);
932       } else {
933         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
934       }
935       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
936       table.put(put);
937     }
938   }
939 
940   static class FilteredScanTest extends Test {
941     protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
942 
943     FilteredScanTest(Configuration conf, TestOptions options, Status status) {
944       super(conf, options, status);
945     }
946 
947     @Override
948     void testRow(int i) throws IOException {
949       byte[] value = generateData(this.rand, VALUE_LENGTH);
950       Scan scan = constructScan(value);
951       ResultScanner scanner = null;
952       try {
953         scanner = this.table.getScanner(scan);
954         while (scanner.next() != null) {
955         }
956       } finally {
957         if (scanner != null) scanner.close();
958       }
959     }
960 
961     protected Scan constructScan(byte[] valuePrefix) throws IOException {
962       FilterList list = new FilterList();
963       Filter filter = new SingleColumnValueFilter(
964           FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
965           new BinaryComparator(valuePrefix)
966       );
967       list.addFilter(filter);
968       if(opts.filterAll) {
969         list.addFilter(new FilterAllFilter());
970       }
971       Scan scan = new Scan();
972       if (opts.addColumns) {
973         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
974       }
975       scan.setFilter(list);
976       return scan;
977     }
978   }
979 
980   /**
981    * Compute a throughput rate in MB/s.
982    * @param rows Number of records consumed.
983    * @param timeMs Time taken in milliseconds.
984    * @return String value with label, ie '123.76 MB/s'
985    */
986   private static String calculateMbps(int rows, long timeMs) {
987     // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS)
988     //        * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB
989     BigDecimal rowSize =
990       BigDecimal.valueOf(ROW_LENGTH + VALUE_LENGTH + FAMILY_NAME.length + QUALIFIER_NAME.length);
991     BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
992       .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
993       .divide(BYTES_PER_MB, CXT);
994     return FMT.format(mbps) + " MB/s";
995   }
996 
997   /*
998    * Format passed integer.
999    * @param number
1000    * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
1001    * number (Does absolute in case number is negative).
1002    */
1003   public static byte [] format(final int number) {
1004     byte [] b = new byte[ROW_LENGTH];
1005     int d = Math.abs(number);
1006     for (int i = b.length - 1; i >= 0; i--) {
1007       b[i] = (byte)((d % 10) + '0');
1008       d /= 10;
1009     }
1010     return b;
1011   }
1012 
1013   /*
1014    * This method takes some time and is done inline uploading data.  For
1015    * example, doing the mapfile test, generation of the key and value
1016    * consumes about 30% of CPU time.
1017    * @return Generated random value to insert into a table cell.
1018    */
1019   public static byte[] generateData(final Random r, int length) {
1020     byte [] b = new byte [length];
1021     int i = 0;
1022 
1023     for(i = 0; i < (length-8); i += 8) {
1024       b[i] = (byte) (65 + r.nextInt(26));
1025       b[i+1] = b[i];
1026       b[i+2] = b[i];
1027       b[i+3] = b[i];
1028       b[i+4] = b[i];
1029       b[i+5] = b[i];
1030       b[i+6] = b[i];
1031       b[i+7] = b[i];
1032     }
1033 
1034     byte a = (byte) (65 + r.nextInt(26));
1035     for(; i < length; i++) {
1036       b[i] = a;
1037     }
1038     return b;
1039   }
1040 
1041   /**
1042    * @deprecated Use {@link #generateData(java.util.Random, int)} instead.
1043    * @return Generated random value to insert into a table cell.
1044    */
1045   @Deprecated
1046   public static byte[] generateValue(final Random r) {
1047     return generateData(r, VALUE_LENGTH);
1048   }
1049 
1050   static byte [] getRandomRow(final Random random, final int totalRows) {
1051     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1052   }
1053 
1054   static long runOneClient(final Class<? extends Test> cmd, Configuration conf, TestOptions opts,
1055     final Status status)
1056       throws IOException {
1057     status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
1058       opts.perClientRunRows + " rows");
1059     long totalElapsedTime = 0;
1060 
1061     final Test t;
1062     try {
1063       Constructor<? extends Test> constructor =
1064         cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class);
1065       t = constructor.newInstance(conf, opts, status);
1066     } catch (NoSuchMethodException e) {
1067       throw new IllegalArgumentException("Invalid command class: " +
1068           cmd.getName() + ".  It does not provide a constructor as described by " +
1069           "the javadoc comment.  Available constructors are: " +
1070           Arrays.toString(cmd.getConstructors()));
1071     } catch (Exception e) {
1072       throw new IllegalStateException("Failed to construct command class", e);
1073     }
1074     totalElapsedTime = t.test();
1075 
1076     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
1077       "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
1078       " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime) + ")");
1079     return totalElapsedTime;
1080   }
1081 
1082   private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
1083       InterruptedException, ClassNotFoundException {
1084     HBaseAdmin admin = null;
1085     try {
1086       admin = new HBaseAdmin(getConf());
1087       checkTable(admin, opts);
1088     } finally {
1089       if (admin != null) admin.close();
1090     }
1091     if (opts.nomapred) {
1092       doLocalClients(cmd, opts);
1093     } else {
1094       doMapReduce(cmd, opts);
1095     }
1096   }
1097 
1098   protected void printUsage() {
1099     printUsage(null);
1100   }
1101 
1102   protected void printUsage(final String message) {
1103     if (message != null && message.length() > 0) {
1104       System.err.println(message);
1105     }
1106     System.err.println("Usage: java " + this.getClass().getName() + " \\");
1107     System.err.println("  [--nomapred] [--rows=ROWS] [--table=NAME] \\");
1108     System.err.println("  [--compress=TYPE] [--blockEncoding=TYPE] " +
1109       "[-D<property=value>]* <command> <nclients>");
1110     System.err.println();
1111     System.err.println("Options:");
1112     System.err.println(" nomapred        Run multiple clients using threads " +
1113       "(rather than use mapreduce)");
1114     System.err.println(" rows            Rows each client runs. Default: One million");
1115     System.err.println(" sampleRate      Execute test on a sample of total " +
1116       "rows. Only supported by randomRead. Default: 1.0");
1117     System.err.println(" table           Alternate table name. Default: 'TestTable'");
1118     System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
1119     System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
1120       "Default: false");
1121     System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
1122     System.err.println(" presplit        Create presplit table. Recommended for accurate perf " +
1123       "analysis (see guide).  Default: disabled");
1124     System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
1125       "inmemory as far as possible. Not guaranteed that reads are always served " +
1126       "from memory.  Default: false");
1127     System.err.println(" usetags         Writes tags along with KVs. Use with HFile V3. " +
1128       "Default: false");
1129     System.err.println(" numoftags       Specify the no of tags that would be needed. " +
1130        "This works only if usetags is true.");
1131     System.err.println(" filterAll       Helps to filter out all the rows on the server side"
1132         + " there by not returning any thing back to the client.  Helps to check the server side"
1133         + " performance.  Uses FilterAllFilter internally. ");
1134     System.err.println(" latency         Set to report operation latencies. " +
1135       "Currently only supported by randomRead test. Default: False");
1136     System.err.println(" bloomFilter      Bloom filter type, one of " + Arrays.toString(BloomType.values()));
1137     System.err.println();
1138     System.err.println(" Note: -D properties will be applied to the conf used. ");
1139     System.err.println("  For example: ");
1140     System.err.println("   -Dmapred.output.compress=true");
1141     System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
1142     System.err.println("   -Dmapreduce.task.timeout=60000");
1143     System.err.println();
1144     System.err.println("Command:");
1145     for (CmdDescriptor command : commands.values()) {
1146       System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1147     }
1148     System.err.println();
1149     System.err.println("Args:");
1150     System.err.println(" nclients        Integer. Required. Total number of " +
1151       "clients (and HRegionServers)");
1152     System.err.println("                 running: 1 <= value <= 500");
1153     System.err.println("Examples:");
1154     System.err.println(" To run a single evaluation client:");
1155     System.err.println(" $ bin/hbase " + this.getClass().getName()
1156         + " sequentialWrite 1");
1157   }
1158 
1159   private static int getNumClients(final int start, final String[] args) {
1160     if(start + 1 > args.length) {
1161       throw new IllegalArgumentException("must supply the number of clients");
1162     }
1163     int N = Integer.parseInt(args[start]);
1164     if (N < 1) {
1165       throw new IllegalArgumentException("Number of clients must be > 1");
1166     }
1167     return N;
1168   }
1169 
1170   public int run(String[] args) throws Exception {
1171     // Process command-line args. TODO: Better cmd-line processing
1172     // (but hopefully something not as painful as cli options).
1173     int errCode = -1;
1174     if (args.length < 1) {
1175       printUsage();
1176       return errCode;
1177     }
1178 
1179     try {
1180       // MR-NOTE: if you are adding a property that is used to control an operation
1181       // like put(), get(), scan(), ... you must also add it as part of the MR 
1182       // input, take a look at writeInputFile().
1183       // Then you must adapt the LINE_PATTERN input regex,
1184       // and parse the argument, take a look at PEInputFormat.getSplits().
1185 
1186       TestOptions opts = new TestOptions();
1187 
1188       for (int i = 0; i < args.length; i++) {
1189         String cmd = args[i];
1190         if (cmd.equals("-h") || cmd.startsWith("--h")) {
1191           printUsage();
1192           errCode = 0;
1193           break;
1194         }
1195 
1196         final String nmr = "--nomapred";
1197         if (cmd.startsWith(nmr)) {
1198           opts.nomapred = true;
1199           continue;
1200         }
1201 
1202         final String rows = "--rows=";
1203         if (cmd.startsWith(rows)) {
1204           opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
1205           continue;
1206         }
1207 
1208         final String sampleRate = "--sampleRate=";
1209         if (cmd.startsWith(sampleRate)) {
1210           opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
1211           continue;
1212         }
1213 
1214         final String table = "--table=";
1215         if (cmd.startsWith(table)) {
1216           opts.tableName = cmd.substring(table.length());
1217           continue;
1218         }
1219 
1220         final String compress = "--compress=";
1221         if (cmd.startsWith(compress)) {
1222           opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
1223           continue;
1224         }
1225 
1226         final String blockEncoding = "--blockEncoding=";
1227         if (cmd.startsWith(blockEncoding)) {
1228           opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
1229           continue;
1230         }
1231 
1232         final String flushCommits = "--flushCommits=";
1233         if (cmd.startsWith(flushCommits)) {
1234           opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
1235           continue;
1236         }
1237 
1238         final String writeToWAL = "--writeToWAL=";
1239         if (cmd.startsWith(writeToWAL)) {
1240           opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
1241           continue;
1242         }
1243 
1244         final String presplit = "--presplit=";
1245         if (cmd.startsWith(presplit)) {
1246           opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
1247           continue;
1248         }
1249 
1250         final String inMemory = "--inmemory=";
1251         if (cmd.startsWith(inMemory)) {
1252           opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
1253           continue;
1254         }
1255 
1256         final String latency = "--latency";
1257         if (cmd.startsWith(latency)) {
1258           opts.reportLatency = true;
1259           continue;
1260         }
1261 
1262         final String multiGet = "--multiGet=";
1263         if (cmd.startsWith(multiGet)) {
1264           opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
1265           continue;
1266         }
1267 
1268         final String useTags = "--usetags=";
1269         if (cmd.startsWith(useTags)) {
1270           opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
1271           continue;
1272         }
1273         
1274         final String noOfTags = "--numoftags=";
1275         if (cmd.startsWith(noOfTags)) {
1276           opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
1277           continue;
1278         }
1279 
1280         final String filterOutAll = "--filterAll";
1281         if (cmd.startsWith(filterOutAll)) {
1282           opts.filterAll = true;
1283           continue;
1284         }
1285 
1286         final String bloomFilter = "--bloomFilter";
1287         if (cmd.startsWith(bloomFilter)) {
1288           opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
1289           continue;
1290         }
1291 
1292         final String addColumns = "--addColumns=";
1293         if (cmd.startsWith(addColumns)) {
1294           opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
1295           continue;
1296         }
1297 
1298         Class<? extends Test> cmdClass = determineCommandClass(cmd);
1299         if (cmdClass != null) {
1300           opts.numClientThreads = getNumClients(i + 1, args);
1301           // number of rows specified
1302           opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
1303           runTest(cmdClass, opts);
1304           errCode = 0;
1305           break;
1306         }
1307 
1308         printUsage();
1309         break;
1310       }
1311     } catch (Exception e) {
1312       e.printStackTrace();
1313     }
1314 
1315     return errCode;
1316   }
1317 
1318   private Class<? extends Test> determineCommandClass(String cmd) {
1319     CmdDescriptor descriptor = commands.get(cmd);
1320     return descriptor != null ? descriptor.getCmdClass() : null;
1321   }
1322 
1323   public static void main(final String[] args) throws Exception {
1324     int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
1325     System.exit(res);
1326   }
1327 }