1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.test;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Random;
29 import java.util.Set;
30 import java.util.UUID;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.commons.cli.CommandLine;
34 import org.apache.commons.cli.GnuParser;
35 import org.apache.commons.cli.HelpFormatter;
36 import org.apache.commons.cli.Options;
37 import org.apache.commons.cli.ParseException;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.conf.Configured;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.hbase.HBaseConfiguration;
45 import org.apache.hadoop.hbase.HBaseTestingUtility;
46 import org.apache.hadoop.hbase.HColumnDescriptor;
47 import org.apache.hadoop.hbase.HRegionLocation;
48 import org.apache.hadoop.hbase.HTableDescriptor;
49 import org.apache.hadoop.hbase.IntegrationTestBase;
50 import org.apache.hadoop.hbase.IntegrationTestingUtility;
51 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
52 import org.apache.hadoop.hbase.fs.HFileSystem;
53 import org.apache.hadoop.hbase.MasterNotRunningException;
54 import org.apache.hadoop.hbase.TableName;
55 import org.apache.hadoop.hbase.client.Get;
56 import org.apache.hadoop.hbase.client.HBaseAdmin;
57 import org.apache.hadoop.hbase.client.HConnection;
58 import org.apache.hadoop.hbase.client.HConnectionManager;
59 import org.apache.hadoop.hbase.client.HTable;
60 import org.apache.hadoop.hbase.client.Put;
61 import org.apache.hadoop.hbase.client.Result;
62 import org.apache.hadoop.hbase.client.ResultScanner;
63 import org.apache.hadoop.hbase.client.Scan;
64 import org.apache.hadoop.hbase.client.ScannerCallable;
65 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
66 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
67 import org.apache.hadoop.hbase.mapreduce.TableMapper;
68 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
69 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
70 import org.apache.hadoop.hbase.util.Bytes;
71 import org.apache.hadoop.hbase.util.RegionSplitter;
72 import org.apache.hadoop.io.BytesWritable;
73 import org.apache.hadoop.io.NullWritable;
74 import org.apache.hadoop.io.Text;
75 import org.apache.hadoop.io.Writable;
76 import org.apache.hadoop.mapreduce.Counter;
77 import org.apache.hadoop.mapreduce.CounterGroup;
78 import org.apache.hadoop.mapreduce.Counters;
79 import org.apache.hadoop.mapreduce.InputFormat;
80 import org.apache.hadoop.mapreduce.InputSplit;
81 import org.apache.hadoop.mapreduce.Job;
82 import org.apache.hadoop.mapreduce.JobContext;
83 import org.apache.hadoop.mapreduce.Mapper;
84 import org.apache.hadoop.mapreduce.RecordReader;
85 import org.apache.hadoop.mapreduce.Reducer;
86 import org.apache.hadoop.mapreduce.TaskAttemptContext;
87 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
88 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
89 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
90 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
91 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
92 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
93 import org.apache.hadoop.util.Tool;
94 import org.apache.hadoop.util.ToolRunner;
95 import org.junit.Test;
96 import org.junit.experimental.categories.Category;
97
98 import com.google.common.collect.Sets;
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 @Category(IntegrationTests.class)
164 public class IntegrationTestBigLinkedList extends IntegrationTestBase {
165 protected static final byte[] NO_KEY = new byte[1];
166
167 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
168
169 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
170
171 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
172
173
174 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
175
176
177 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
178
179
180 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
181
182
183 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
184 = "IntegrationTestBigLinkedList.generator.num_rows";
185
186 private static final String GENERATOR_NUM_MAPPERS_KEY
187 = "IntegrationTestBigLinkedList.generator.map.tasks";
188
189 private static final String GENERATOR_WIDTH_KEY
190 = "IntegrationTestBigLinkedList.generator.width";
191
192 private static final String GENERATOR_WRAP_KEY
193 = "IntegrationTestBigLinkedList.generator.wrap";
194
195 protected int NUM_SLAVES_BASE = 3;
196
197 private static final int MISSING_ROWS_TO_LOG = 50;
198
199 private static final int WIDTH_DEFAULT = 1000000;
200 private static final int WRAP_DEFAULT = 25;
201 private static final int ROWKEY_LENGTH = 16;
202
203 protected String toRun;
204 protected String[] otherArgs;
205
206 static class CINode {
207 byte[] key;
208 byte[] prev;
209 String client;
210 long count;
211 }
212
213
214
215
216 static class Generator extends Configured implements Tool {
217
218 private static final Log LOG = LogFactory.getLog(Generator.class);
219
220 static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
221 static class GeneratorInputSplit extends InputSplit implements Writable {
222 @Override
223 public long getLength() throws IOException, InterruptedException {
224 return 1;
225 }
226 @Override
227 public String[] getLocations() throws IOException, InterruptedException {
228 return new String[0];
229 }
230 @Override
231 public void readFields(DataInput arg0) throws IOException {
232 }
233 @Override
234 public void write(DataOutput arg0) throws IOException {
235 }
236 }
237
238 static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
239 private long count;
240 private long numNodes;
241 private Random rand;
242
243 @Override
244 public void close() throws IOException {
245 }
246
247 @Override
248 public BytesWritable getCurrentKey() throws IOException, InterruptedException {
249 byte[] bytes = new byte[ROWKEY_LENGTH];
250 rand.nextBytes(bytes);
251 return new BytesWritable(bytes);
252 }
253
254 @Override
255 public NullWritable getCurrentValue() throws IOException, InterruptedException {
256 return NullWritable.get();
257 }
258
259 @Override
260 public float getProgress() throws IOException, InterruptedException {
261 return (float)(count / (double)numNodes);
262 }
263
264 @Override
265 public void initialize(InputSplit arg0, TaskAttemptContext context)
266 throws IOException, InterruptedException {
267 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
268 rand = new Random();
269 }
270
271 @Override
272 public boolean nextKeyValue() throws IOException, InterruptedException {
273 return count++ < numNodes;
274 }
275
276 }
277
278 @Override
279 public RecordReader<BytesWritable,NullWritable> createRecordReader(
280 InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
281 GeneratorRecordReader rr = new GeneratorRecordReader();
282 rr.initialize(split, context);
283 return rr;
284 }
285
286 @Override
287 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
288 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
289
290 ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
291
292 for (int i = 0; i < numMappers; i++) {
293 splits.add(new GeneratorInputSplit());
294 }
295
296 return splits;
297 }
298 }
299
300
301 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
302 @Override
303 protected boolean isSplitable(JobContext context, Path filename) {
304 return false;
305 }
306 }
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332 static class GeneratorMapper
333 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
334
335 byte[][] first = null;
336 byte[][] prev = null;
337 byte[][] current = null;
338 byte[] id;
339 long count = 0;
340 int i;
341 HTable table;
342 long numNodes;
343 long wrap;
344 int width;
345
346 @Override
347 protected void setup(Context context) throws IOException, InterruptedException {
348 id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
349 Configuration conf = context.getConfiguration();
350 instantiateHTable(conf);
351 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
352 current = new byte[this.width][];
353 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
354 this.wrap = (long)wrapMultiplier * width;
355 this.numNodes = context.getConfiguration().getLong(
356 GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
357 if (this.numNodes < this.wrap) {
358 this.wrap = this.numNodes;
359 }
360 }
361
362 protected void instantiateHTable(Configuration conf) throws IOException {
363 table = new HTable(conf, getTableName(conf));
364 table.setAutoFlush(false, true);
365 table.setWriteBufferSize(4 * 1024 * 1024);
366 }
367
368 @Override
369 protected void cleanup(Context context) throws IOException ,InterruptedException {
370 table.close();
371 }
372
373 @Override
374 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
375 current[i] = new byte[key.getLength()];
376 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
377 if (++i == current.length) {
378 persist(output, count, prev, current, id);
379 i = 0;
380
381 if (first == null)
382 first = current;
383 prev = current;
384 current = new byte[this.width][];
385
386 count += current.length;
387 output.setStatus("Count " + count);
388
389 if (count % wrap == 0) {
390
391
392 circularLeftShift(first);
393
394 persist(output, -1, prev, first, null);
395
396 first = null;
397 prev = null;
398 }
399 }
400 }
401
402 private static <T> void circularLeftShift(T[] first) {
403 T ez = first[0];
404 System.arraycopy(first, 1, first, 0, first.length - 1);
405 first[first.length - 1] = ez;
406 }
407
408 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
409 throws IOException {
410 for (int i = 0; i < current.length; i++) {
411 Put put = new Put(current[i]);
412 put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
413
414 if (count >= 0) {
415 put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
416 }
417 if (id != null) {
418 put.add(FAMILY_NAME, COLUMN_CLIENT, id);
419 }
420 table.put(put);
421
422 if (i % 1000 == 0) {
423
424 output.progress();
425 }
426 }
427
428 table.flushCommits();
429 }
430 }
431
432 @Override
433 public int run(String[] args) throws Exception {
434 if (args.length < 3) {
435 System.out.println("Usage : " + Generator.class.getSimpleName() +
436 " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
437 System.out.println(" where <num nodes per map> should be a multiple of " +
438 " width*wrap multiplier, 25M by default");
439 return 0;
440 }
441
442 int numMappers = Integer.parseInt(args[0]);
443 long numNodes = Long.parseLong(args[1]);
444 Path tmpOutput = new Path(args[2]);
445 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
446 Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
447 return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
448 }
449
450 protected void createSchema() throws IOException {
451 Configuration conf = getConf();
452 HBaseAdmin admin = new HBaseAdmin(conf);
453 TableName tableName = getTableName(conf);
454 try {
455 if (!admin.tableExists(tableName)) {
456 HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
457 htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
458 int numberOfServers = admin.getClusterStatus().getServers().size();
459 if (numberOfServers == 0) {
460 throw new IllegalStateException("No live regionservers");
461 }
462 int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
463 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
464 int totalNumberOfRegions = numberOfServers * regionsPerServer;
465 LOG.info("Number of live regionservers: " + numberOfServers + ", " +
466 "pre-splitting table into " + totalNumberOfRegions + " regions " +
467 "(default regions per server: " + regionsPerServer + ")");
468
469 byte[][] splits = new RegionSplitter.UniformSplit().split(
470 totalNumberOfRegions);
471
472 admin.createTable(htd, splits);
473 }
474 } catch (MasterNotRunningException e) {
475 LOG.error("Master not running", e);
476 throw new IOException(e);
477 } finally {
478 admin.close();
479 }
480 }
481
482 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
483 Integer width, Integer wrapMuplitplier) throws Exception {
484 LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
485 + ", numNodes=" + numNodes);
486 Job job = new Job(getConf());
487
488 job.setJobName("Random Input Generator");
489 job.setNumReduceTasks(0);
490 job.setJarByClass(getClass());
491
492 job.setInputFormatClass(GeneratorInputFormat.class);
493 job.setOutputKeyClass(BytesWritable.class);
494 job.setOutputValueClass(NullWritable.class);
495
496 setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
497
498 job.setMapperClass(Mapper.class);
499
500 FileOutputFormat.setOutputPath(job, tmpOutput);
501 job.setOutputFormatClass(SequenceFileOutputFormat.class);
502
503 boolean success = jobCompletion(job);
504
505 return success ? 0 : 1;
506 }
507
508 public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
509 Integer width, Integer wrapMuplitplier) throws Exception {
510 LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
511 createSchema();
512 Job job = new Job(getConf());
513
514 job.setJobName("Link Generator");
515 job.setNumReduceTasks(0);
516 job.setJarByClass(getClass());
517
518 FileInputFormat.setInputPaths(job, tmpOutput);
519 job.setInputFormatClass(OneFilePerMapperSFIF.class);
520 job.setOutputKeyClass(NullWritable.class);
521 job.setOutputValueClass(NullWritable.class);
522
523 setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
524
525 setMapperForGenerator(job);
526
527 job.setOutputFormatClass(NullOutputFormat.class);
528
529 job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
530 TableMapReduceUtil.addDependencyJars(job);
531 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
532 TableMapReduceUtil.initCredentials(job);
533
534 boolean success = jobCompletion(job);
535
536 return success ? 0 : 1;
537 }
538
539 protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
540 ClassNotFoundException {
541 boolean success = job.waitForCompletion(true);
542 return success;
543 }
544
545 protected void setMapperForGenerator(Job job) {
546 job.setMapperClass(GeneratorMapper.class);
547 }
548
549 public int run(int numMappers, long numNodes, Path tmpOutput,
550 Integer width, Integer wrapMuplitplier) throws Exception {
551 int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
552 if (ret > 0) {
553 return ret;
554 }
555 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
556 }
557 }
558
559
560
561
562
563 static class Verify extends Configured implements Tool {
564
565 private static final Log LOG = LogFactory.getLog(Verify.class);
566 protected static final BytesWritable DEF = new BytesWritable(NO_KEY);
567
568 protected Job job;
569
570 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
571 private BytesWritable row = new BytesWritable();
572 private BytesWritable ref = new BytesWritable();
573
574 @Override
575 protected void map(ImmutableBytesWritable key, Result value, Context context)
576 throws IOException ,InterruptedException {
577 byte[] rowKey = key.get();
578 row.set(rowKey, 0, rowKey.length);
579 context.write(row, DEF);
580 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
581 if (prev != null && prev.length > 0) {
582 ref.set(prev, 0, prev.length);
583 context.write(ref, row);
584 } else {
585 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
586 }
587 }
588 }
589
590 public static enum Counts {
591 UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES
592 }
593
594 public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
595 private ArrayList<byte[]> refs = new ArrayList<byte[]>();
596
597 private AtomicInteger rows = new AtomicInteger(0);
598
599 @Override
600 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
601 throws IOException, InterruptedException {
602
603 int defCount = 0;
604
605 refs.clear();
606 for (BytesWritable type : values) {
607 if (type.getLength() == DEF.getLength()) {
608 defCount++;
609 } else {
610 byte[] bytes = new byte[type.getLength()];
611 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
612 refs.add(bytes);
613 }
614 }
615
616
617
618 StringBuilder refsSb = null;
619 String keyString = null;
620 if (defCount == 0 || refs.size() != 1) {
621 refsSb = new StringBuilder();
622 String comma = "";
623 for (byte[] ref : refs) {
624 refsSb.append(comma);
625 comma = ",";
626 refsSb.append(Bytes.toStringBinary(ref));
627 }
628 keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
629
630 LOG.error("Linked List error: Key = " + keyString + " References = " + refsSb.toString());
631 }
632
633 if (defCount == 0 && refs.size() > 0) {
634
635
636 context.write(new Text(keyString), new Text(refsSb.toString()));
637 context.getCounter(Counts.UNDEFINED).increment(1);
638 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
639 context.getCounter("undef", keyString).increment(1);
640 }
641 } else if (defCount > 0 && refs.size() == 0) {
642
643 context.write(new Text(keyString), new Text("none"));
644 context.getCounter(Counts.UNREFERENCED).increment(1);
645 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
646 context.getCounter("unref", keyString).increment(1);
647 }
648 } else {
649 if (refs.size() > 1) {
650 if (refsSb != null) {
651 context.write(new Text(keyString), new Text(refsSb.toString()));
652 }
653 context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
654 }
655
656 context.getCounter(Counts.REFERENCED).increment(1);
657 }
658
659 }
660 }
661
662 @Override
663 public int run(String[] args) throws Exception {
664
665 if (args.length != 2) {
666 System.out.println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
667 return 0;
668 }
669
670 String outputDir = args[0];
671 int numReducers = Integer.parseInt(args[1]);
672
673 return run(outputDir, numReducers);
674 }
675
676 public int run(String outputDir, int numReducers) throws Exception {
677 return run(new Path(outputDir), numReducers);
678 }
679
680 public int run(Path outputDir, int numReducers) throws Exception {
681 LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
682
683 job = new Job(getConf());
684
685 job.setJobName("Link Verifier");
686 job.setNumReduceTasks(numReducers);
687 job.setJarByClass(getClass());
688
689 setJobScannerConf(job);
690
691 Scan scan = new Scan();
692 scan.addColumn(FAMILY_NAME, COLUMN_PREV);
693 scan.setCaching(10000);
694 scan.setCacheBlocks(false);
695
696 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
697 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
698 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
699
700 job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
701
702 job.setReducerClass(VerifyReducer.class);
703 job.setOutputFormatClass(TextOutputFormat.class);
704 TextOutputFormat.setOutputPath(job, outputDir);
705
706 boolean success = job.waitForCompletion(true);
707
708 return success ? 0 : 1;
709 }
710
711 @SuppressWarnings("deprecation")
712 public boolean verify(long expectedReferenced) throws Exception {
713 if (job == null) {
714 throw new IllegalStateException("You should call run() first");
715 }
716
717 Counters counters = job.getCounters();
718
719 Counter referenced = counters.findCounter(Counts.REFERENCED);
720 Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
721 Counter undefined = counters.findCounter(Counts.UNDEFINED);
722 Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
723
724 boolean success = true;
725
726 if (expectedReferenced != referenced.getValue()) {
727 LOG.error("Expected referenced count does not match with actual referenced count. " +
728 "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
729 success = false;
730 }
731
732 if (unreferenced.getValue() > 0) {
733 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
734 LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
735 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
736 success = false;
737 }
738
739 if (undefined.getValue() > 0) {
740 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
741 success = false;
742 }
743
744 if (!success) {
745 handleFailure(counters);
746 }
747 return success;
748 }
749
750 protected void handleFailure(Counters counters) throws IOException {
751 Configuration conf = job.getConfiguration();
752 HConnection conn = HConnectionManager.getConnection(conf);
753 TableName tableName = getTableName(conf);
754 CounterGroup g = counters.getGroup("undef");
755 Iterator<Counter> it = g.iterator();
756 while (it.hasNext()) {
757 String keyString = it.next().getName();
758 byte[] key = Bytes.toBytes(keyString);
759 HRegionLocation loc = conn.relocateRegion(tableName, key);
760 LOG.error("undefined row " + keyString + ", " + loc);
761 }
762 g = counters.getGroup("unref");
763 it = g.iterator();
764 while (it.hasNext()) {
765 String keyString = it.next().getName();
766 byte[] key = Bytes.toBytes(keyString);
767 HRegionLocation loc = conn.relocateRegion(tableName, key);
768 LOG.error("unreferred row " + keyString + ", " + loc);
769 }
770 }
771 }
772
773
774
775
776
777 static class Loop extends Configured implements Tool {
778
779 private static final Log LOG = LogFactory.getLog(Loop.class);
780
781 IntegrationTestBigLinkedList it;
782
783 protected void runGenerator(int numMappers, long numNodes,
784 String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
785 Path outputPath = new Path(outputDir);
786 UUID uuid = UUID.randomUUID();
787 Path generatorOutput = new Path(outputPath, uuid.toString());
788
789 Generator generator = new Generator();
790 generator.setConf(getConf());
791 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
792 if (retCode > 0) {
793 throw new RuntimeException("Generator failed with return code: " + retCode);
794 }
795 }
796
797 protected void runVerify(String outputDir,
798 int numReducers, long expectedNumNodes) throws Exception {
799 Path outputPath = new Path(outputDir);
800 UUID uuid = UUID.randomUUID();
801 Path iterationOutput = new Path(outputPath, uuid.toString());
802
803 Verify verify = new Verify();
804 verify.setConf(getConf());
805 int retCode = verify.run(iterationOutput, numReducers);
806 if (retCode > 0) {
807 throw new RuntimeException("Verify.run failed with return code: " + retCode);
808 }
809
810 if (!verify.verify(expectedNumNodes)) {
811 throw new RuntimeException("Verify.verify failed");
812 }
813
814 LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
815 }
816
817 @Override
818 public int run(String[] args) throws Exception {
819 if (args.length < 5) {
820 System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
821 return 1;
822 }
823 LOG.info("Running Loop with args:" + Arrays.deepToString(args));
824
825 int numIterations = Integer.parseInt(args[0]);
826 int numMappers = Integer.parseInt(args[1]);
827 long numNodes = Long.parseLong(args[2]);
828 String outputDir = args[3];
829 int numReducers = Integer.parseInt(args[4]);
830 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
831 Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
832
833 long expectedNumNodes = 0;
834
835 if (numIterations < 0) {
836 numIterations = Integer.MAX_VALUE;
837 }
838
839 for (int i = 0; i < numIterations; i++) {
840 LOG.info("Starting iteration = " + i);
841 runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
842 expectedNumNodes += numMappers * numNodes;
843
844 runVerify(outputDir, numReducers, expectedNumNodes);
845 }
846
847 return 0;
848 }
849 }
850
851
852
853
854 private static class Print extends Configured implements Tool {
855 @Override
856 public int run(String[] args) throws Exception {
857 Options options = new Options();
858 options.addOption("s", "start", true, "start key");
859 options.addOption("e", "end", true, "end key");
860 options.addOption("l", "limit", true, "number to print");
861
862 GnuParser parser = new GnuParser();
863 CommandLine cmd = null;
864 try {
865 cmd = parser.parse(options, args);
866 if (cmd.getArgs().length != 0) {
867 throw new ParseException("Command takes no arguments");
868 }
869 } catch (ParseException e) {
870 System.err.println("Failed to parse command line " + e.getMessage());
871 System.err.println();
872 HelpFormatter formatter = new HelpFormatter();
873 formatter.printHelp(getClass().getSimpleName(), options);
874 System.exit(-1);
875 }
876
877 HTable table = new HTable(getConf(), getTableName(getConf()));
878
879 Scan scan = new Scan();
880 scan.setBatch(10000);
881
882 if (cmd.hasOption("s"))
883 scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
884
885 if (cmd.hasOption("e"))
886 scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
887
888 int limit = 0;
889 if (cmd.hasOption("l"))
890 limit = Integer.parseInt(cmd.getOptionValue("l"));
891 else
892 limit = 100;
893
894 ResultScanner scanner = table.getScanner(scan);
895
896 CINode node = new CINode();
897 Result result = scanner.next();
898 int count = 0;
899 while (result != null && count++ < limit) {
900 node = getCINode(result, node);
901 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
902 Bytes.toStringBinary(node.prev), node.count, node.client);
903 result = scanner.next();
904 }
905 scanner.close();
906 table.close();
907
908 return 0;
909 }
910 }
911
912
913
914
915 private static class Delete extends Configured implements Tool {
916 @Override
917 public int run(String[] args) throws Exception {
918 if (args.length != 1) {
919 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
920 return 0;
921 }
922 byte[] val = Bytes.toBytesBinary(args[0]);
923
924 org.apache.hadoop.hbase.client.Delete delete
925 = new org.apache.hadoop.hbase.client.Delete(val);
926
927 HTable table = new HTable(getConf(), getTableName(getConf()));
928
929 table.delete(delete);
930 table.flushCommits();
931 table.close();
932
933 System.out.println("Delete successful");
934 return 0;
935 }
936 }
937
938
939
940
941 private static class Walker extends Configured implements Tool {
942 @Override
943 public int run(String[] args) throws IOException {
944 Options options = new Options();
945 options.addOption("n", "num", true, "number of queries");
946 options.addOption("s", "start", true, "key to start at, binary string");
947 options.addOption("l", "logevery", true, "log every N queries");
948
949 GnuParser parser = new GnuParser();
950 CommandLine cmd = null;
951 try {
952 cmd = parser.parse(options, args);
953 if (cmd.getArgs().length != 0) {
954 throw new ParseException("Command takes no arguments");
955 }
956 } catch (ParseException e) {
957 System.err.println("Failed to parse command line " + e.getMessage());
958 System.err.println();
959 HelpFormatter formatter = new HelpFormatter();
960 formatter.printHelp(getClass().getSimpleName(), options);
961 System.exit(-1);
962 }
963
964 long maxQueries = Long.MAX_VALUE;
965 if (cmd.hasOption('n')) {
966 maxQueries = Long.parseLong(cmd.getOptionValue("n"));
967 }
968 Random rand = new Random();
969 boolean isSpecificStart = cmd.hasOption('s');
970 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
971 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
972
973 HTable table = new HTable(getConf(), getTableName(getConf()));
974 long numQueries = 0;
975
976
977
978 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
979 if (!isSpecificStart) {
980 startKey = new byte[ROWKEY_LENGTH];
981 rand.nextBytes(startKey);
982 }
983 CINode node = findStartNode(table, startKey);
984 if (node == null && isSpecificStart) {
985 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
986 }
987 numQueries++;
988 while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
989 byte[] prev = node.prev;
990 long t1 = System.currentTimeMillis();
991 node = getNode(prev, table, node);
992 long t2 = System.currentTimeMillis();
993 if (numQueries % logEvery == 0) {
994 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
995 }
996 numQueries++;
997 if (node == null) {
998 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
999 } else if (node.prev.length == NO_KEY.length) {
1000 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1001 }
1002 }
1003 }
1004
1005 table.close();
1006 return 0;
1007 }
1008
1009 private static CINode findStartNode(HTable table, byte[] startKey) throws IOException {
1010 Scan scan = new Scan();
1011 scan.setStartRow(startKey);
1012 scan.setBatch(1);
1013 scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1014
1015 long t1 = System.currentTimeMillis();
1016 ResultScanner scanner = table.getScanner(scan);
1017 Result result = scanner.next();
1018 long t2 = System.currentTimeMillis();
1019 scanner.close();
1020
1021 if ( result != null) {
1022 CINode node = getCINode(result, new CINode());
1023 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1024 return node;
1025 }
1026
1027 System.out.println("FSR " + (t2 - t1));
1028
1029 return null;
1030 }
1031
1032 private CINode getNode(byte[] row, HTable table, CINode node) throws IOException {
1033 Get get = new Get(row);
1034 get.addColumn(FAMILY_NAME, COLUMN_PREV);
1035 Result result = table.get(get);
1036 return getCINode(result, node);
1037 }
1038 }
1039
1040 private static class Clean extends Configured implements Tool {
1041
1042 @Override public int run(String[] args) throws Exception {
1043 if (args.length < 1) {
1044 System.err.println("Usage: Clean <output dir>");
1045 return -1;
1046 }
1047
1048 Path p = new Path(args[0]);
1049 Configuration conf = getConf();
1050 TableName tableName = getTableName(conf);
1051
1052 FileSystem fs = HFileSystem.get(conf);
1053 HBaseAdmin admin = new HBaseAdmin(conf);
1054 try {
1055 if (admin.tableExists(tableName)) {
1056 admin.disableTable(tableName);
1057 admin.deleteTable(tableName);
1058 }
1059 } finally {
1060 admin.close();
1061 }
1062
1063 if (fs.exists(p)) {
1064 fs.delete(p, true);
1065 }
1066
1067 return 0;
1068 }
1069 }
1070
1071 static TableName getTableName(Configuration conf) {
1072 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1073 }
1074
1075 private static CINode getCINode(Result result, CINode node) {
1076 node.key = Bytes.copy(result.getRow());
1077 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1078 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1079 } else {
1080 node.prev = NO_KEY;
1081 }
1082 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1083 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1084 } else {
1085 node.count = -1;
1086 }
1087 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1088 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1089 } else {
1090 node.client = "";
1091 }
1092 return node;
1093 }
1094
1095 protected IntegrationTestingUtility util;
1096
1097 @Override
1098 public void setUpCluster() throws Exception {
1099 util = getTestingUtil(getConf());
1100 boolean isDistributed = util.isDistributedCluster();
1101 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1102 if (!isDistributed) {
1103 util.startMiniMapReduceCluster();
1104 }
1105 this.setConf(util.getConfiguration());
1106 }
1107
1108 @Override
1109 public void cleanUpCluster() throws Exception {
1110 super.cleanUpCluster();
1111 if (util.isDistributedCluster()) {
1112 util.shutdownMiniMapReduceCluster();
1113 }
1114 }
1115
1116 @Test
1117 public void testContinuousIngest() throws IOException, Exception {
1118
1119 int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new Loop(),
1120 new String[] {"1", "1", "2000000",
1121 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1"});
1122 org.junit.Assert.assertEquals(0, ret);
1123 }
1124
1125 private void usage() {
1126 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1127 printCommands();
1128 }
1129
1130 private void printCommands() {
1131 System.err.println("Commands:");
1132 System.err.println(" Generator Map only job that generates data.");
1133 System.err.println(" Verify A map reduce job that looks for holes. Look at the counts ");
1134 System.err.println(" after running. See REFERENCED and UNREFERENCED are ok. Any ");
1135 System.err.println(" UNDEFINED counts are bad. Do not run with the Generator.");
1136 System.err.println(" Walker " +
1137 "Standalong program that starts following a linked list & emits timing info.");
1138 System.err.println(" Print Standalone program that prints nodes in the linked list.");
1139 System.err.println(" Delete Standalone program that deletes a·single node.");
1140 System.err.println(" Loop Program to Loop through Generator and Verify steps");
1141 System.err.println(" Clean Program to clean all left over detritus.");
1142 System.err.flush();
1143 }
1144
1145 @Override
1146 protected void processOptions(CommandLine cmd) {
1147 super.processOptions(cmd);
1148 String[] args = cmd.getArgs();
1149
1150 if (args.length < 1) {
1151 printUsage(this.getClass().getSimpleName() +
1152 " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1153 printCommands();
1154 throw new RuntimeException("Incorrect Number of args.");
1155 }
1156 toRun = args[0];
1157 otherArgs = Arrays.copyOfRange(args, 1, args.length);
1158 }
1159
1160 @Override
1161 public int runTestFromCommandLine() throws Exception {
1162
1163 Tool tool = null;
1164 if (toRun.equals("Generator")) {
1165 tool = new Generator();
1166 } else if (toRun.equalsIgnoreCase("Verify")) {
1167 tool = new Verify();
1168 } else if (toRun.equalsIgnoreCase("Loop")) {
1169 Loop loop = new Loop();
1170 loop.it = this;
1171 tool = loop;
1172 } else if (toRun.equalsIgnoreCase("Walker")) {
1173 tool = new Walker();
1174 } else if (toRun.equalsIgnoreCase("Print")) {
1175 tool = new Print();
1176 } else if (toRun.equalsIgnoreCase("Delete")) {
1177 tool = new Delete();
1178 } else if (toRun.equalsIgnoreCase("Clean")) {
1179 tool = new Clean();
1180 } else {
1181 usage();
1182 throw new RuntimeException("Unknown arg");
1183 }
1184
1185 return ToolRunner.run(getConf(), tool, otherArgs);
1186 }
1187
1188 @Override
1189 public String getTablename() {
1190 Configuration c = getConf();
1191 return c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME);
1192 }
1193
1194 @Override
1195 protected Set<String> getColumnFamilies() {
1196 return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1197 }
1198
1199 private static void setJobConf(Job job, int numMappers, long numNodes,
1200 Integer width, Integer wrapMultiplier) {
1201 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1202 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1203 if (width != null) {
1204 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1205 }
1206 if (wrapMultiplier != null) {
1207 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1208 }
1209 }
1210
1211 public static void setJobScannerConf(Job job) {
1212
1213 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1214 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1215 }
1216
1217 public static void main(String[] args) throws Exception {
1218 Configuration conf = HBaseConfiguration.create();
1219 IntegrationTestingUtility.setUseDistributedCluster(conf);
1220 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1221 System.exit(ret);
1222 }
1223 }