1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY;
22
23 import java.io.IOException;
24 import java.io.PrintStream;
25 import java.lang.reflect.Constructor;
26 import java.math.BigDecimal;
27 import java.math.MathContext;
28 import java.text.DecimalFormat;
29 import java.text.SimpleDateFormat;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Date;
33 import java.util.Map;
34 import java.util.Random;
35 import java.util.TreeMap;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41
42 import com.google.common.util.concurrent.ThreadFactoryBuilder;
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.conf.Configured;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.client.Durability;
51 import org.apache.hadoop.hbase.client.Get;
52 import org.apache.hadoop.hbase.client.HBaseAdmin;
53 import org.apache.hadoop.hbase.client.HConnection;
54 import org.apache.hadoop.hbase.client.HConnectionManager;
55 import org.apache.hadoop.hbase.client.HTableInterface;
56 import org.apache.hadoop.hbase.client.Put;
57 import org.apache.hadoop.hbase.client.Result;
58 import org.apache.hadoop.hbase.client.ResultScanner;
59 import org.apache.hadoop.hbase.client.Scan;
60 import org.apache.hadoop.hbase.filter.BinaryComparator;
61 import org.apache.hadoop.hbase.filter.CompareFilter;
62 import org.apache.hadoop.hbase.filter.Filter;
63 import org.apache.hadoop.hbase.filter.FilterAllFilter;
64 import org.apache.hadoop.hbase.filter.FilterList;
65 import org.apache.hadoop.hbase.filter.PageFilter;
66 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
67 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
68 import org.apache.hadoop.hbase.io.compress.Compression;
69 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
70 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
71 import org.apache.hadoop.hbase.regionserver.BloomType;
72 import org.apache.hadoop.hbase.util.Bytes;
73 import org.apache.hadoop.hbase.util.Hash;
74 import org.apache.hadoop.hbase.util.MurmurHash;
75 import org.apache.hadoop.hbase.util.Pair;
76 import org.apache.hadoop.io.LongWritable;
77 import org.apache.hadoop.io.Text;
78 import org.apache.hadoop.mapreduce.Job;
79 import org.apache.hadoop.mapreduce.Mapper;
80 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
81 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
82 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
83 import org.apache.hadoop.util.Tool;
84 import org.apache.hadoop.util.ToolRunner;
85 import org.codehaus.jackson.map.ObjectMapper;
86
87 import com.google.common.util.concurrent.ThreadFactoryBuilder;
88 import com.yammer.metrics.core.Histogram;
89 import com.yammer.metrics.core.MetricsRegistry;
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 public class PerformanceEvaluation extends Configured implements Tool {
108 protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
109
110 public static final String TABLE_NAME = "TestTable";
111 public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
112 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
113 public static final int VALUE_LENGTH = 1000;
114 public static final int ROW_LENGTH = 26;
115
116 private static final int ONE_GB = 1024 * 1024 * 1000;
117 private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH;
118
119 private static final int TAG_LENGTH = 256;
120 private static final DecimalFormat FMT = new DecimalFormat("0.##");
121 private static final MathContext CXT = MathContext.DECIMAL64;
122 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
123 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
124 private static final TestOptions DEFAULT_OPTS = new TestOptions();
125
126 protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
127
128 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
129
130
131
132
133
134 protected static enum Counter {
135
136 ELAPSED_TIME,
137
138 ROWS
139 }
140
141
142
143
144
145 public PerformanceEvaluation(final Configuration conf) {
146 super(conf);
147
148 addCommandDescriptor(RandomReadTest.class, "randomRead",
149 "Run random read test");
150 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
151 "Run random seek and scan 100 test");
152 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
153 "Run random seek scan with both start and stop row (max 10 rows)");
154 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
155 "Run random seek scan with both start and stop row (max 100 rows)");
156 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
157 "Run random seek scan with both start and stop row (max 1000 rows)");
158 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
159 "Run random seek scan with both start and stop row (max 10000 rows)");
160 addCommandDescriptor(RandomWriteTest.class, "randomWrite",
161 "Run random write test");
162 addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
163 "Run sequential read test");
164 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
165 "Run sequential write test");
166 addCommandDescriptor(ScanTest.class, "scan",
167 "Run scan test (read every row)");
168 addCommandDescriptor(FilteredScanTest.class, "filterScan",
169 "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
170 }
171
172 protected void addCommandDescriptor(Class<? extends Test> cmdClass,
173 String name, String description) {
174 CmdDescriptor cmdDescriptor =
175 new CmdDescriptor(cmdClass, name, description);
176 commands.put(name, cmdDescriptor);
177 }
178
179
180
181
182 interface Status {
183
184
185
186
187
188 void setStatus(final String msg) throws IOException;
189 }
190
191
192
193
194 public static class EvaluationMapTask
195 extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
196
197
198 public final static String CMD_KEY = "EvaluationMapTask.command";
199
200 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
201
202 private Class<? extends Test> cmd;
203 private PerformanceEvaluation pe;
204
205 @Override
206 protected void setup(Context context) throws IOException, InterruptedException {
207 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
208
209
210
211 Class<? extends PerformanceEvaluation> peClass =
212 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
213 try {
214 this.pe = peClass.getConstructor(Configuration.class)
215 .newInstance(context.getConfiguration());
216 } catch (Exception e) {
217 throw new IllegalStateException("Could not instantiate PE instance", e);
218 }
219 }
220
221 private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
222 try {
223 return Class.forName(className).asSubclass(type);
224 } catch (ClassNotFoundException e) {
225 throw new IllegalStateException("Could not find class for name: " + className, e);
226 }
227 }
228
229 protected void map(LongWritable key, Text value, final Context context)
230 throws IOException, InterruptedException {
231
232 Status status = new Status() {
233 public void setStatus(String msg) {
234 context.setStatus(msg);
235 }
236 };
237
238 ObjectMapper mapper = new ObjectMapper();
239 TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
240 Configuration conf = HBaseConfiguration.create(context.getConfiguration());
241
242
243 long elapsedTime = this.pe.runOneClient(this.cmd, conf, opts, status);
244
245
246 context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
247 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
248 context.write(new LongWritable(opts.startRow), new LongWritable(elapsedTime));
249 context.progress();
250 }
251 }
252
253
254
255
256
257
258
259 private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
260 HTableDescriptor tableDescriptor = getTableDescriptor(opts);
261 if (opts.presplitRegions > 0) {
262
263 if (admin.tableExists(tableDescriptor.getTableName())) {
264 admin.disableTable(tableDescriptor.getTableName());
265 admin.deleteTable(tableDescriptor.getTableName());
266 }
267
268 byte[][] splits = getSplits(opts);
269 for (int i=0; i < splits.length; i++) {
270 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
271 }
272 admin.createTable(tableDescriptor, splits);
273 LOG.info ("Table created with " + opts.presplitRegions + " splits");
274 }
275 else {
276 boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
277 if (!tableExists) {
278 admin.createTable(tableDescriptor);
279 LOG.info("Table " + tableDescriptor + " created");
280 }
281 }
282 return admin.tableExists(tableDescriptor.getTableName());
283 }
284
285
286
287
288 protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
289 HTableDescriptor desc = new HTableDescriptor(opts.tableName);
290 HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
291 family.setDataBlockEncoding(opts.blockEncoding);
292 family.setCompressionType(opts.compression);
293 family.setBloomFilterType(opts.bloomType);
294 if (opts.inMemoryCF) {
295 family.setInMemory(true);
296 }
297 desc.addFamily(family);
298 return desc;
299 }
300
301
302
303
304 protected static byte[][] getSplits(TestOptions opts) {
305 if (opts.presplitRegions == 0)
306 return new byte [0][];
307
308 int numSplitPoints = opts.presplitRegions - 1;
309 byte[][] splits = new byte[numSplitPoints][];
310 int jump = opts.totalRows / opts.presplitRegions;
311 for (int i = 0; i < numSplitPoints; i++) {
312 int rowkey = jump * (1 + i);
313 splits[i] = format(rowkey);
314 }
315 return splits;
316 }
317
318
319
320
321
322
323 private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
324 throws IOException, InterruptedException {
325 Future<Long>[] threads = new Future[opts.numClientThreads];
326 long[] timings = new long[opts.numClientThreads];
327 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
328 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
329 for (int i = 0; i < threads.length; i++) {
330 final int index = i;
331 threads[i] = pool.submit(new Callable<Long>() {
332 @Override
333 public Long call() throws Exception {
334 TestOptions threadOpts = new TestOptions(opts);
335 threadOpts.startRow = index * threadOpts.perClientRunRows;
336 long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
337 public void setStatus(final String msg) throws IOException {
338 LOG.info("client-" + Thread.currentThread().getName() + " " + msg);
339 }
340 });
341 LOG.info("Finished " + Thread.currentThread().getName() + " in " + elapsedTime +
342 "ms over " + threadOpts.perClientRunRows + " rows");
343 return elapsedTime;
344 }
345 });
346 }
347 pool.shutdown();
348 for (int i = 0; i < threads.length; i++) {
349 try {
350 timings[i] = threads[i].get();
351 } catch (ExecutionException e) {
352 throw new IOException(e.getCause());
353 }
354 }
355 final String test = cmd.getSimpleName();
356 LOG.info("[" + test + "] Summary of timings (ms): "
357 + Arrays.toString(timings));
358 Arrays.sort(timings);
359 long total = 0;
360 for (int i = 0; i < timings.length; i++) {
361 total += timings[i];
362 }
363 LOG.info("[" + test + "]"
364 + "\tMin: " + timings[0] + "ms"
365 + "\tMax: " + timings[timings.length - 1] + "ms"
366 + "\tAvg: " + (total / timings.length) + "ms");
367 }
368
369
370
371
372
373
374
375
376 private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
377 InterruptedException, ClassNotFoundException {
378 Configuration conf = getConf();
379 Path inputDir = writeInputFile(conf, opts);
380 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
381 conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
382 Job job = new Job(conf);
383 job.setJarByClass(PerformanceEvaluation.class);
384 job.setJobName("HBase Performance Evaluation");
385
386 job.setInputFormatClass(NLineInputFormat.class);
387 NLineInputFormat.setInputPaths(job, inputDir);
388
389 NLineInputFormat.setNumLinesPerSplit(job, 1);
390
391 job.setOutputKeyClass(LongWritable.class);
392 job.setOutputValueClass(LongWritable.class);
393
394 job.setMapperClass(EvaluationMapTask.class);
395 job.setReducerClass(LongSumReducer.class);
396
397 job.setNumReduceTasks(1);
398
399 job.setOutputFormatClass(TextOutputFormat.class);
400 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
401
402 TableMapReduceUtil.addDependencyJars(job);
403 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
404 DescriptiveStatistics.class,
405 ObjectMapper.class);
406
407 TableMapReduceUtil.initCredentials(job);
408
409 job.waitForCompletion(true);
410 }
411
412
413
414
415
416
417
418 private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
419 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
420 Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
421 Path inputDir = new Path(jobdir, "inputs");
422
423 FileSystem fs = FileSystem.get(c);
424 fs.mkdirs(inputDir);
425
426 Path inputFile = new Path(inputDir, "input.txt");
427 PrintStream out = new PrintStream(fs.create(inputFile));
428
429 Map<Integer, String> m = new TreeMap<Integer, String>();
430 Hash h = MurmurHash.getInstance();
431 int perClientRows = (opts.totalRows / opts.numClientThreads);
432 ObjectMapper mapper = new ObjectMapper();
433 mapper.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
434 try {
435 for (int i = 0; i < 10; i++) {
436 for (int j = 0; j < opts.numClientThreads; j++) {
437 TestOptions next = new TestOptions(opts);
438 next.startRow = (j * perClientRows) + (i * (perClientRows/10));
439 next.perClientRunRows = perClientRows / 10;
440 String s = mapper.writeValueAsString(next);
441 int hash = h.hash(Bytes.toBytes(s));
442 m.put(hash, s);
443 }
444 }
445 for (Map.Entry<Integer, String> e: m.entrySet()) {
446 out.println(e.getValue());
447 }
448 } finally {
449 out.close();
450 }
451 return inputDir;
452 }
453
454
455
456
457 static class CmdDescriptor {
458 private Class<? extends Test> cmdClass;
459 private String name;
460 private String description;
461
462 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
463 this.cmdClass = cmdClass;
464 this.name = name;
465 this.description = description;
466 }
467
468 public Class<? extends Test> getCmdClass() {
469 return cmdClass;
470 }
471
472 public String getName() {
473 return name;
474 }
475
476 public String getDescription() {
477 return description;
478 }
479 }
480
481
482
483
484
485 static class TestOptions {
486
487 public TestOptions() {}
488
489 public TestOptions(TestOptions that) {
490 this.nomapred = that.nomapred;
491 this.startRow = that.startRow;
492 this.perClientRunRows = that.perClientRunRows;
493 this.numClientThreads = that.numClientThreads;
494 this.totalRows = that.totalRows;
495 this.sampleRate = that.sampleRate;
496 this.tableName = that.tableName;
497 this.flushCommits = that.flushCommits;
498 this.writeToWAL = that.writeToWAL;
499 this.useTags = that.useTags;
500 this.noOfTags = that.noOfTags;
501 this.reportLatency = that.reportLatency;
502 this.multiGet = that.multiGet;
503 this.inMemoryCF = that.inMemoryCF;
504 this.presplitRegions = that.presplitRegions;
505 this.compression = that.compression;
506 this.blockEncoding = that.blockEncoding;
507 this.filterAll = that.filterAll;
508 this.bloomType = that.bloomType;
509 this.addColumns = that.addColumns;
510 }
511
512 public boolean nomapred = false;
513 public boolean filterAll = false;
514 public int startRow = 0;
515 public int perClientRunRows = ROWS_PER_GB;
516 public int numClientThreads = 1;
517 public int totalRows = ROWS_PER_GB;
518 public float sampleRate = 1.0f;
519 public String tableName = TABLE_NAME;
520 public boolean flushCommits = true;
521 public boolean writeToWAL = true;
522 public boolean useTags = false;
523 public int noOfTags = 1;
524 public boolean reportLatency = false;
525 public int multiGet = 0;
526 boolean inMemoryCF = false;
527 int presplitRegions = 0;
528 public Compression.Algorithm compression = Compression.Algorithm.NONE;
529 public BloomType bloomType = BloomType.ROW;
530 public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
531 boolean addColumns = true;
532 }
533
534
535
536
537
538 static abstract class Test {
539
540
541 private static final Random randomSeed = new Random(System.currentTimeMillis());
542 private static long nextRandomSeed() {
543 return randomSeed.nextLong();
544 }
545 protected final Random rand = new Random(nextRandomSeed());
546 protected final Configuration conf;
547 protected final TestOptions opts;
548
549 private final Status status;
550 protected HConnection connection;
551 protected HTableInterface table;
552
553
554
555
556
557 Test(final Configuration conf, final TestOptions options, final Status status) {
558 this.conf = conf;
559 this.opts = options;
560 this.status = status;
561 }
562
563 private String generateStatus(final int sr, final int i, final int lr) {
564 return sr + "/" + i + "/" + lr;
565 }
566
567 protected int getReportingPeriod() {
568 int period = opts.perClientRunRows / 10;
569 return period == 0 ? opts.perClientRunRows : period;
570 }
571
572 void testSetup() throws IOException {
573 this.connection = HConnectionManager.createConnection(conf);
574 this.table = connection.getTable(opts.tableName);
575 this.table.setAutoFlush(false, true);
576 }
577
578 void testTakedown() throws IOException {
579 if (opts.flushCommits) {
580 this.table.flushCommits();
581 }
582 table.close();
583 connection.close();
584 }
585
586
587
588
589
590
591 long test() throws IOException {
592 testSetup();
593 LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
594 final long startTime = System.nanoTime();
595 try {
596 testTimed();
597 } finally {
598 testTakedown();
599 }
600 return (System.nanoTime() - startTime) / 1000000;
601 }
602
603
604
605
606 void testTimed() throws IOException {
607 int lastRow = opts.startRow + opts.perClientRunRows;
608
609 for (int i = opts.startRow; i < lastRow; i++) {
610 testRow(i);
611 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
612 status.setStatus(generateStatus(opts.startRow, i, lastRow));
613 }
614 }
615 }
616
617
618
619
620
621 abstract void testRow(final int i) throws IOException;
622 }
623
624
625 @SuppressWarnings("unused")
626 static class RandomSeekScanTest extends Test {
627 RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
628 super(conf, options, status);
629 }
630
631 @Override
632 void testRow(final int i) throws IOException {
633 Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
634 FilterList list = new FilterList();
635 if (opts.addColumns) {
636 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
637 }
638 if (opts.filterAll) {
639 list.addFilter(new FilterAllFilter());
640 }
641 list.addFilter(new WhileMatchFilter(new PageFilter(120)));
642 scan.setFilter(list);
643 ResultScanner s = this.table.getScanner(scan);
644 for (Result rr; (rr = s.next()) != null;) ;
645 s.close();
646 }
647
648 @Override
649 protected int getReportingPeriod() {
650 int period = opts.perClientRunRows / 100;
651 return period == 0 ? opts.perClientRunRows : period;
652 }
653
654 }
655
656 @SuppressWarnings("unused")
657 static abstract class RandomScanWithRangeTest extends Test {
658 RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
659 super(conf, options, status);
660 }
661
662 @Override
663 void testRow(final int i) throws IOException {
664 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
665 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
666 if (opts.filterAll) {
667 scan.setFilter(new FilterAllFilter());
668 }
669 if (opts.addColumns) {
670 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
671 }
672 ResultScanner s = this.table.getScanner(scan);
673 int count = 0;
674 for (Result rr; (rr = s.next()) != null;) {
675 count++;
676 }
677
678 if (i % 100 == 0) {
679 LOG.info(String.format("Scan for key range %s - %s returned %s rows",
680 Bytes.toString(startAndStopRow.getFirst()),
681 Bytes.toString(startAndStopRow.getSecond()), count));
682 }
683
684 s.close();
685 }
686
687 protected abstract Pair<byte[],byte[]> getStartAndStopRow();
688
689 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
690 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
691 int stop = start + maxRange;
692 return new Pair<byte[],byte[]>(format(start), format(stop));
693 }
694
695 @Override
696 protected int getReportingPeriod() {
697 int period = opts.perClientRunRows / 100;
698 return period == 0? opts.perClientRunRows: period;
699 }
700 }
701
702 static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
703 RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
704 super(conf, options, status);
705 }
706
707 @Override
708 protected Pair<byte[], byte[]> getStartAndStopRow() {
709 return generateStartAndStopRows(10);
710 }
711 }
712
713 static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
714 RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
715 super(conf, options, status);
716 }
717
718 @Override
719 protected Pair<byte[], byte[]> getStartAndStopRow() {
720 return generateStartAndStopRows(100);
721 }
722 }
723
724 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
725 RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
726 super(conf, options, status);
727 }
728
729 @Override
730 protected Pair<byte[], byte[]> getStartAndStopRow() {
731 return generateStartAndStopRows(1000);
732 }
733 }
734
735 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
736 RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
737 super(conf, options, status);
738 }
739
740 @Override
741 protected Pair<byte[], byte[]> getStartAndStopRow() {
742 return generateStartAndStopRows(10000);
743 }
744 }
745
746 static class RandomReadTest extends Test {
747 private final int everyN;
748 private final double[] times;
749 private ArrayList<Get> gets;
750 int idx = 0;
751
752 RandomReadTest(Configuration conf, TestOptions options, Status status) {
753 super(conf, options, status);
754 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
755 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
756 if (opts.multiGet > 0) {
757 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
758 this.gets = new ArrayList<Get>(opts.multiGet);
759 }
760 if (opts.reportLatency) {
761 this.times = new double[(int) Math.ceil(opts.perClientRunRows * opts.sampleRate / Math.max(1, opts.multiGet))];
762 } else {
763 this.times = null;
764 }
765 }
766
767 @Override
768 void testRow(final int i) throws IOException {
769 if (i % everyN == 0) {
770 Get get = new Get(getRandomRow(this.rand, opts.totalRows));
771 if (opts.addColumns) {
772 get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
773 }
774 if (opts.filterAll) {
775 get.setFilter(new FilterAllFilter());
776 }
777 if (opts.multiGet > 0) {
778 this.gets.add(get);
779 if (this.gets.size() == opts.multiGet) {
780 long start = System.nanoTime();
781 this.table.get(this.gets);
782 if (opts.reportLatency) {
783 times[idx++] = (System.nanoTime() - start) / 1e6;
784 }
785 this.gets.clear();
786 }
787 } else {
788 long start = System.nanoTime();
789 this.table.get(get);
790 if (opts.reportLatency) {
791 times[idx++] = (System.nanoTime() - start) / 1e6;
792 }
793 }
794 }
795 }
796
797 @Override
798 protected int getReportingPeriod() {
799 int period = opts.perClientRunRows / 100;
800 return period == 0 ? opts.perClientRunRows : period;
801 }
802
803 @Override
804 protected void testTakedown() throws IOException {
805 if (this.gets != null && this.gets.size() > 0) {
806 this.table.get(gets);
807 this.gets.clear();
808 }
809 super.testTakedown();
810 if (opts.reportLatency) {
811 Arrays.sort(times);
812 DescriptiveStatistics ds = new DescriptiveStatistics();
813 for (double t : times) {
814 ds.addValue(t);
815 }
816 LOG.info("randomRead latency log (ms), on " + times.length + " measures");
817 LOG.info("99.9999% = " + ds.getPercentile(99.9999d));
818 LOG.info(" 99.999% = " + ds.getPercentile(99.999d));
819 LOG.info(" 99.99% = " + ds.getPercentile(99.99d));
820 LOG.info(" 99.9% = " + ds.getPercentile(99.9d));
821 LOG.info(" 99% = " + ds.getPercentile(99d));
822 LOG.info(" 95% = " + ds.getPercentile(95d));
823 LOG.info(" 90% = " + ds.getPercentile(90d));
824 LOG.info(" 80% = " + ds.getPercentile(80d));
825 LOG.info("Standard Deviation = " + ds.getStandardDeviation());
826 LOG.info("Mean = " + ds.getMean());
827 }
828 }
829 }
830
831 static class RandomWriteTest extends Test {
832 RandomWriteTest(Configuration conf, TestOptions options, Status status) {
833 super(conf, options, status);
834 }
835
836 @Override
837 void testRow(final int i) throws IOException {
838 byte[] row = getRandomRow(this.rand, opts.totalRows);
839 Put put = new Put(row);
840 byte[] value = generateData(this.rand, VALUE_LENGTH);
841 if (opts.useTags) {
842 byte[] tag = generateData(this.rand, TAG_LENGTH);
843 Tag[] tags = new Tag[opts.noOfTags];
844 for (int n = 0; n < opts.noOfTags; n++) {
845 Tag t = new Tag((byte) n, tag);
846 tags[n] = t;
847 }
848 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
849 value, tags);
850 put.add(kv);
851 } else {
852 put.add(FAMILY_NAME, QUALIFIER_NAME, value);
853 }
854 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
855 table.put(put);
856 }
857 }
858
859
860 static class ScanTest extends Test {
861 private ResultScanner testScanner;
862
863 ScanTest(Configuration conf, TestOptions options, Status status) {
864 super(conf, options, status);
865 }
866
867 @Override
868 void testTakedown() throws IOException {
869 if (this.testScanner != null) {
870 this.testScanner.close();
871 }
872 super.testTakedown();
873 }
874
875
876 @Override
877 void testRow(final int i) throws IOException {
878 if (this.testScanner == null) {
879 Scan scan = new Scan(format(opts.startRow));
880 scan.setCaching(30);
881 if (opts.addColumns) {
882 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
883 }
884 if (opts.filterAll) {
885 scan.setFilter(new FilterAllFilter());
886 }
887 this.testScanner = table.getScanner(scan);
888 }
889 testScanner.next();
890 }
891
892 }
893
894 static class SequentialReadTest extends Test {
895 SequentialReadTest(Configuration conf, TestOptions options, Status status) {
896 super(conf, options, status);
897 }
898
899 @Override
900 void testRow(final int i) throws IOException {
901 Get get = new Get(format(i));
902 if (opts.addColumns) {
903 get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
904 }
905 if (opts.filterAll) {
906 get.setFilter(new FilterAllFilter());
907 }
908 table.get(get);
909 }
910 }
911
912 static class SequentialWriteTest extends Test {
913 SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
914 super(conf, options, status);
915 }
916
917 @Override
918 void testRow(final int i) throws IOException {
919 byte[] row = format(i);
920 Put put = new Put(row);
921 byte[] value = generateData(this.rand, VALUE_LENGTH);
922 if (opts.useTags) {
923 byte[] tag = generateData(this.rand, TAG_LENGTH);
924 Tag[] tags = new Tag[opts.noOfTags];
925 for (int n = 0; n < opts.noOfTags; n++) {
926 Tag t = new Tag((byte) n, tag);
927 tags[n] = t;
928 }
929 KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
930 value, tags);
931 put.add(kv);
932 } else {
933 put.add(FAMILY_NAME, QUALIFIER_NAME, value);
934 }
935 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
936 table.put(put);
937 }
938 }
939
940 static class FilteredScanTest extends Test {
941 protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
942
943 FilteredScanTest(Configuration conf, TestOptions options, Status status) {
944 super(conf, options, status);
945 }
946
947 @Override
948 void testRow(int i) throws IOException {
949 byte[] value = generateData(this.rand, VALUE_LENGTH);
950 Scan scan = constructScan(value);
951 ResultScanner scanner = null;
952 try {
953 scanner = this.table.getScanner(scan);
954 while (scanner.next() != null) {
955 }
956 } finally {
957 if (scanner != null) scanner.close();
958 }
959 }
960
961 protected Scan constructScan(byte[] valuePrefix) throws IOException {
962 FilterList list = new FilterList();
963 Filter filter = new SingleColumnValueFilter(
964 FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
965 new BinaryComparator(valuePrefix)
966 );
967 list.addFilter(filter);
968 if(opts.filterAll) {
969 list.addFilter(new FilterAllFilter());
970 }
971 Scan scan = new Scan();
972 if (opts.addColumns) {
973 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
974 }
975 scan.setFilter(list);
976 return scan;
977 }
978 }
979
980
981
982
983
984
985
986 private static String calculateMbps(int rows, long timeMs) {
987
988
989 BigDecimal rowSize =
990 BigDecimal.valueOf(ROW_LENGTH + VALUE_LENGTH + FAMILY_NAME.length + QUALIFIER_NAME.length);
991 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
992 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
993 .divide(BYTES_PER_MB, CXT);
994 return FMT.format(mbps) + " MB/s";
995 }
996
997
998
999
1000
1001
1002
1003 public static byte [] format(final int number) {
1004 byte [] b = new byte[ROW_LENGTH];
1005 int d = Math.abs(number);
1006 for (int i = b.length - 1; i >= 0; i--) {
1007 b[i] = (byte)((d % 10) + '0');
1008 d /= 10;
1009 }
1010 return b;
1011 }
1012
1013
1014
1015
1016
1017
1018
1019 public static byte[] generateData(final Random r, int length) {
1020 byte [] b = new byte [length];
1021 int i = 0;
1022
1023 for(i = 0; i < (length-8); i += 8) {
1024 b[i] = (byte) (65 + r.nextInt(26));
1025 b[i+1] = b[i];
1026 b[i+2] = b[i];
1027 b[i+3] = b[i];
1028 b[i+4] = b[i];
1029 b[i+5] = b[i];
1030 b[i+6] = b[i];
1031 b[i+7] = b[i];
1032 }
1033
1034 byte a = (byte) (65 + r.nextInt(26));
1035 for(; i < length; i++) {
1036 b[i] = a;
1037 }
1038 return b;
1039 }
1040
1041
1042
1043
1044
1045 @Deprecated
1046 public static byte[] generateValue(final Random r) {
1047 return generateData(r, VALUE_LENGTH);
1048 }
1049
1050 static byte [] getRandomRow(final Random random, final int totalRows) {
1051 return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1052 }
1053
1054 static long runOneClient(final Class<? extends Test> cmd, Configuration conf, TestOptions opts,
1055 final Status status)
1056 throws IOException {
1057 status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
1058 opts.perClientRunRows + " rows");
1059 long totalElapsedTime = 0;
1060
1061 final Test t;
1062 try {
1063 Constructor<? extends Test> constructor =
1064 cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class);
1065 t = constructor.newInstance(conf, opts, status);
1066 } catch (NoSuchMethodException e) {
1067 throw new IllegalArgumentException("Invalid command class: " +
1068 cmd.getName() + ". It does not provide a constructor as described by " +
1069 "the javadoc comment. Available constructors are: " +
1070 Arrays.toString(cmd.getConstructors()));
1071 } catch (Exception e) {
1072 throw new IllegalStateException("Failed to construct command class", e);
1073 }
1074 totalElapsedTime = t.test();
1075
1076 status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
1077 "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
1078 " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime) + ")");
1079 return totalElapsedTime;
1080 }
1081
1082 private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
1083 InterruptedException, ClassNotFoundException {
1084 HBaseAdmin admin = null;
1085 try {
1086 admin = new HBaseAdmin(getConf());
1087 checkTable(admin, opts);
1088 } finally {
1089 if (admin != null) admin.close();
1090 }
1091 if (opts.nomapred) {
1092 doLocalClients(cmd, opts);
1093 } else {
1094 doMapReduce(cmd, opts);
1095 }
1096 }
1097
1098 protected void printUsage() {
1099 printUsage(null);
1100 }
1101
1102 protected void printUsage(final String message) {
1103 if (message != null && message.length() > 0) {
1104 System.err.println(message);
1105 }
1106 System.err.println("Usage: java " + this.getClass().getName() + " \\");
1107 System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\");
1108 System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] " +
1109 "[-D<property=value>]* <command> <nclients>");
1110 System.err.println();
1111 System.err.println("Options:");
1112 System.err.println(" nomapred Run multiple clients using threads " +
1113 "(rather than use mapreduce)");
1114 System.err.println(" rows Rows each client runs. Default: One million");
1115 System.err.println(" sampleRate Execute test on a sample of total " +
1116 "rows. Only supported by randomRead. Default: 1.0");
1117 System.err.println(" table Alternate table name. Default: 'TestTable'");
1118 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'");
1119 System.err.println(" flushCommits Used to determine if the test should flush the table. " +
1120 "Default: false");
1121 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True");
1122 System.err.println(" presplit Create presplit table. Recommended for accurate perf " +
1123 "analysis (see guide). Default: disabled");
1124 System.err.println(" inmemory Tries to keep the HFiles of the CF " +
1125 "inmemory as far as possible. Not guaranteed that reads are always served " +
1126 "from memory. Default: false");
1127 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " +
1128 "Default: false");
1129 System.err.println(" numoftags Specify the no of tags that would be needed. " +
1130 "This works only if usetags is true.");
1131 System.err.println(" filterAll Helps to filter out all the rows on the server side"
1132 + " there by not returning any thing back to the client. Helps to check the server side"
1133 + " performance. Uses FilterAllFilter internally. ");
1134 System.err.println(" latency Set to report operation latencies. " +
1135 "Currently only supported by randomRead test. Default: False");
1136 System.err.println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values()));
1137 System.err.println();
1138 System.err.println(" Note: -D properties will be applied to the conf used. ");
1139 System.err.println(" For example: ");
1140 System.err.println(" -Dmapred.output.compress=true");
1141 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true");
1142 System.err.println(" -Dmapreduce.task.timeout=60000");
1143 System.err.println();
1144 System.err.println("Command:");
1145 for (CmdDescriptor command : commands.values()) {
1146 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1147 }
1148 System.err.println();
1149 System.err.println("Args:");
1150 System.err.println(" nclients Integer. Required. Total number of " +
1151 "clients (and HRegionServers)");
1152 System.err.println(" running: 1 <= value <= 500");
1153 System.err.println("Examples:");
1154 System.err.println(" To run a single evaluation client:");
1155 System.err.println(" $ bin/hbase " + this.getClass().getName()
1156 + " sequentialWrite 1");
1157 }
1158
1159 private static int getNumClients(final int start, final String[] args) {
1160 if(start + 1 > args.length) {
1161 throw new IllegalArgumentException("must supply the number of clients");
1162 }
1163 int N = Integer.parseInt(args[start]);
1164 if (N < 1) {
1165 throw new IllegalArgumentException("Number of clients must be > 1");
1166 }
1167 return N;
1168 }
1169
1170 public int run(String[] args) throws Exception {
1171
1172
1173 int errCode = -1;
1174 if (args.length < 1) {
1175 printUsage();
1176 return errCode;
1177 }
1178
1179 try {
1180
1181
1182
1183
1184
1185
1186 TestOptions opts = new TestOptions();
1187
1188 for (int i = 0; i < args.length; i++) {
1189 String cmd = args[i];
1190 if (cmd.equals("-h") || cmd.startsWith("--h")) {
1191 printUsage();
1192 errCode = 0;
1193 break;
1194 }
1195
1196 final String nmr = "--nomapred";
1197 if (cmd.startsWith(nmr)) {
1198 opts.nomapred = true;
1199 continue;
1200 }
1201
1202 final String rows = "--rows=";
1203 if (cmd.startsWith(rows)) {
1204 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
1205 continue;
1206 }
1207
1208 final String sampleRate = "--sampleRate=";
1209 if (cmd.startsWith(sampleRate)) {
1210 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
1211 continue;
1212 }
1213
1214 final String table = "--table=";
1215 if (cmd.startsWith(table)) {
1216 opts.tableName = cmd.substring(table.length());
1217 continue;
1218 }
1219
1220 final String compress = "--compress=";
1221 if (cmd.startsWith(compress)) {
1222 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
1223 continue;
1224 }
1225
1226 final String blockEncoding = "--blockEncoding=";
1227 if (cmd.startsWith(blockEncoding)) {
1228 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
1229 continue;
1230 }
1231
1232 final String flushCommits = "--flushCommits=";
1233 if (cmd.startsWith(flushCommits)) {
1234 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
1235 continue;
1236 }
1237
1238 final String writeToWAL = "--writeToWAL=";
1239 if (cmd.startsWith(writeToWAL)) {
1240 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
1241 continue;
1242 }
1243
1244 final String presplit = "--presplit=";
1245 if (cmd.startsWith(presplit)) {
1246 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
1247 continue;
1248 }
1249
1250 final String inMemory = "--inmemory=";
1251 if (cmd.startsWith(inMemory)) {
1252 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
1253 continue;
1254 }
1255
1256 final String latency = "--latency";
1257 if (cmd.startsWith(latency)) {
1258 opts.reportLatency = true;
1259 continue;
1260 }
1261
1262 final String multiGet = "--multiGet=";
1263 if (cmd.startsWith(multiGet)) {
1264 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
1265 continue;
1266 }
1267
1268 final String useTags = "--usetags=";
1269 if (cmd.startsWith(useTags)) {
1270 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
1271 continue;
1272 }
1273
1274 final String noOfTags = "--numoftags=";
1275 if (cmd.startsWith(noOfTags)) {
1276 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
1277 continue;
1278 }
1279
1280 final String filterOutAll = "--filterAll";
1281 if (cmd.startsWith(filterOutAll)) {
1282 opts.filterAll = true;
1283 continue;
1284 }
1285
1286 final String bloomFilter = "--bloomFilter";
1287 if (cmd.startsWith(bloomFilter)) {
1288 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
1289 continue;
1290 }
1291
1292 final String addColumns = "--addColumns=";
1293 if (cmd.startsWith(addColumns)) {
1294 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
1295 continue;
1296 }
1297
1298 Class<? extends Test> cmdClass = determineCommandClass(cmd);
1299 if (cmdClass != null) {
1300 opts.numClientThreads = getNumClients(i + 1, args);
1301
1302 opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
1303 runTest(cmdClass, opts);
1304 errCode = 0;
1305 break;
1306 }
1307
1308 printUsage();
1309 break;
1310 }
1311 } catch (Exception e) {
1312 e.printStackTrace();
1313 }
1314
1315 return errCode;
1316 }
1317
1318 private Class<? extends Test> determineCommandClass(String cmd) {
1319 CmdDescriptor descriptor = commands.get(cmd);
1320 return descriptor != null ? descriptor.getCmdClass() : null;
1321 }
1322
1323 public static void main(final String[] args) throws Exception {
1324 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
1325 System.exit(res);
1326 }
1327 }