View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.test;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Random;
29  import java.util.Set;
30  import java.util.UUID;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.commons.cli.CommandLine;
34  import org.apache.commons.cli.GnuParser;
35  import org.apache.commons.cli.HelpFormatter;
36  import org.apache.commons.cli.Options;
37  import org.apache.commons.cli.ParseException;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.conf.Configured;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.HBaseConfiguration;
45  import org.apache.hadoop.hbase.HBaseTestingUtility;
46  import org.apache.hadoop.hbase.HColumnDescriptor;
47  import org.apache.hadoop.hbase.HRegionLocation;
48  import org.apache.hadoop.hbase.HTableDescriptor;
49  import org.apache.hadoop.hbase.IntegrationTestBase;
50  import org.apache.hadoop.hbase.IntegrationTestingUtility;
51  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
52  import org.apache.hadoop.hbase.fs.HFileSystem;
53  import org.apache.hadoop.hbase.MasterNotRunningException;
54  import org.apache.hadoop.hbase.TableName;
55  import org.apache.hadoop.hbase.client.Get;
56  import org.apache.hadoop.hbase.client.HBaseAdmin;
57  import org.apache.hadoop.hbase.client.HConnection;
58  import org.apache.hadoop.hbase.client.HConnectionManager;
59  import org.apache.hadoop.hbase.client.HTable;
60  import org.apache.hadoop.hbase.client.Put;
61  import org.apache.hadoop.hbase.client.Result;
62  import org.apache.hadoop.hbase.client.ResultScanner;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.client.ScannerCallable;
65  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
66  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
67  import org.apache.hadoop.hbase.mapreduce.TableMapper;
68  import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
69  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
70  import org.apache.hadoop.hbase.util.Bytes;
71  import org.apache.hadoop.hbase.util.RegionSplitter;
72  import org.apache.hadoop.io.BytesWritable;
73  import org.apache.hadoop.io.NullWritable;
74  import org.apache.hadoop.io.Text;
75  import org.apache.hadoop.io.Writable;
76  import org.apache.hadoop.mapreduce.Counter;
77  import org.apache.hadoop.mapreduce.CounterGroup;
78  import org.apache.hadoop.mapreduce.Counters;
79  import org.apache.hadoop.mapreduce.InputFormat;
80  import org.apache.hadoop.mapreduce.InputSplit;
81  import org.apache.hadoop.mapreduce.Job;
82  import org.apache.hadoop.mapreduce.JobContext;
83  import org.apache.hadoop.mapreduce.Mapper;
84  import org.apache.hadoop.mapreduce.RecordReader;
85  import org.apache.hadoop.mapreduce.Reducer;
86  import org.apache.hadoop.mapreduce.TaskAttemptContext;
87  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
88  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
89  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
90  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
91  import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
92  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
93  import org.apache.hadoop.util.Tool;
94  import org.apache.hadoop.util.ToolRunner;
95  import org.junit.Test;
96  import org.junit.experimental.categories.Category;
97  
98  import com.google.common.collect.Sets;
99  
100 /**
101  * This is an integration test borrowed from goraci, written by Keith Turner,
102  * which is in turn inspired by the Accumulo test called continous ingest (ci).
103  * The original source code can be found here:
104  * https://github.com/keith-turner/goraci
105  * https://github.com/enis/goraci/
106  *
107  * Apache Accumulo [0] has a simple test suite that verifies that data is not
108  * lost at scale. This test suite is called continuous ingest. This test runs
109  * many ingest clients that continually create linked lists containing 25
110  * million nodes. At some point the clients are stopped and a map reduce job is
111  * run to ensure no linked list has a hole. A hole indicates data was lost.··
112  *
113  * The nodes in the linked list are random. This causes each linked list to
114  * spread across the table. Therefore if one part of a table loses data, then it
115  * will be detected by references in another part of the table.
116  *
117  * THE ANATOMY OF THE TEST
118  *
119  * Below is rough sketch of how data is written. For specific details look at
120  * the Generator code.
121  *
122  * 1 Write out 1 million nodes· 2 Flush the client· 3 Write out 1 million that
123  * reference previous million· 4 If this is the 25th set of 1 million nodes,
124  * then update 1st set of million to point to last· 5 goto 1
125  *
126  * The key is that nodes only reference flushed nodes. Therefore a node should
127  * never reference a missing node, even if the ingest client is killed at any
128  * point in time.
129  *
130  * When running this test suite w/ Accumulo there is a script running in
131  * parallel called the Aggitator that randomly and continuously kills server
132  * processes.·· The outcome was that many data loss bugs were found in Accumulo
133  * by doing this.· This test suite can also help find bugs that impact uptime
134  * and stability when· run for days or weeks.··
135  *
136  * This test suite consists the following· - a few Java programs· - a little
137  * helper script to run the java programs - a maven script to build it.··
138  *
139  * When generating data, its best to have each map task generate a multiple of
140  * 25 million. The reason for this is that circular linked list are generated
141  * every 25M. Not generating a multiple in 25M will result in some nodes in the
142  * linked list not having references. The loss of an unreferenced node can not
143  * be detected.
144  *
145  *
146  * Below is a description of the Java programs
147  *
148  * Generator - A map only job that generates data. As stated previously,·
149  * its best to generate data in multiples of 25M.
150  *
151  * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
152  * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same
153  * time as the Generator.
154  *
155  * Walker - A standalone program that start following a linked list· and emits timing info.··
156  *
157  * Print - A standalone program that prints nodes in the linked list
158  *
159  * Delete - A standalone program that deletes a single node
160  *
161  * This class can be run as a unit test, as an integration test, or from the command line
162  */
163 @Category(IntegrationTests.class)
164 public class IntegrationTestBigLinkedList extends IntegrationTestBase {
165   protected static final byte[] NO_KEY = new byte[1];
166 
167   protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
168 
169   protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
170 
171   protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
172 
173   //link to the id of the prev node in the linked list
174   protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
175 
176   //identifier of the mapred task that generated this row
177   protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
178 
179   //the id of the row within the same client.
180   protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
181 
182   /** How many rows to write per map task. This has to be a multiple of 25M */
183   private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
184     = "IntegrationTestBigLinkedList.generator.num_rows";
185 
186   private static final String GENERATOR_NUM_MAPPERS_KEY
187     = "IntegrationTestBigLinkedList.generator.map.tasks";
188 
189   private static final String GENERATOR_WIDTH_KEY
190     = "IntegrationTestBigLinkedList.generator.width";
191 
192   private static final String GENERATOR_WRAP_KEY
193     = "IntegrationTestBigLinkedList.generator.wrap";
194 
195   protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
196 
197   private static final int MISSING_ROWS_TO_LOG = 50;
198 
199   private static final int WIDTH_DEFAULT = 1000000;
200   private static final int WRAP_DEFAULT = 25;
201   private static final int ROWKEY_LENGTH = 16;
202 
203   protected String toRun;
204   protected String[] otherArgs;
205 
206   static class CINode {
207     byte[] key;
208     byte[] prev;
209     String client;
210     long count;
211   }
212 
213   /**
214    * A Map only job that generates random linked list and stores them.
215    */
216   static class Generator extends Configured implements Tool {
217 
218     private static final Log LOG = LogFactory.getLog(Generator.class);
219 
220     static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
221       static class GeneratorInputSplit extends InputSplit implements Writable {
222         @Override
223         public long getLength() throws IOException, InterruptedException {
224           return 1;
225         }
226         @Override
227         public String[] getLocations() throws IOException, InterruptedException {
228           return new String[0];
229         }
230         @Override
231         public void readFields(DataInput arg0) throws IOException {
232         }
233         @Override
234         public void write(DataOutput arg0) throws IOException {
235         }
236       }
237 
238       static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
239         private long count;
240         private long numNodes;
241         private Random rand;
242 
243         @Override
244         public void close() throws IOException {
245         }
246 
247         @Override
248         public BytesWritable getCurrentKey() throws IOException, InterruptedException {
249           byte[] bytes = new byte[ROWKEY_LENGTH];
250           rand.nextBytes(bytes);
251           return new BytesWritable(bytes);
252         }
253 
254         @Override
255         public NullWritable getCurrentValue() throws IOException, InterruptedException {
256           return NullWritable.get();
257         }
258 
259         @Override
260         public float getProgress() throws IOException, InterruptedException {
261           return (float)(count / (double)numNodes);
262         }
263 
264         @Override
265         public void initialize(InputSplit arg0, TaskAttemptContext context)
266             throws IOException, InterruptedException {
267           numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
268           rand = new Random();
269         }
270 
271         @Override
272         public boolean nextKeyValue() throws IOException, InterruptedException {
273           return count++ < numNodes;
274         }
275 
276       }
277 
278       @Override
279       public RecordReader<BytesWritable,NullWritable> createRecordReader(
280           InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
281         GeneratorRecordReader rr = new GeneratorRecordReader();
282         rr.initialize(split, context);
283         return rr;
284       }
285 
286       @Override
287       public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
288         int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
289 
290         ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
291 
292         for (int i = 0; i < numMappers; i++) {
293           splits.add(new GeneratorInputSplit());
294         }
295 
296         return splits;
297       }
298     }
299 
300     /** Ensure output files from prev-job go to map inputs for current job */
301     static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
302       @Override
303       protected boolean isSplitable(JobContext context, Path filename) {
304         return false;
305       }
306     }
307 
308     /**
309      * Some ASCII art time:
310      * [ . . . ] represents one batch of random longs of length WIDTH
311      *
312      *                _________________________
313      *               |                  ______ |
314      *               |                 |      ||
315      *             .-+-----------------+-----.||
316      *             | |                 |     |||
317      * first   = [ . . . . . . . . . . . ]   |||
318      *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
319      *             | | | | | | | | | | |     |||
320      * prev    = [ . . . . . . . . . . . ]   |||
321      *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
322      *             | | | | | | | | | | |     |||
323      * current = [ . . . . . . . . . . . ]   |||
324      *                                       |||
325      * ...                                   |||
326      *                                       |||
327      * last    = [ . . . . . . . . . . . ]   |||
328      *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
329      *             |                 |________||
330      *             |___________________________|
331      */
332     static class GeneratorMapper
333       extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
334 
335       byte[][] first = null;
336       byte[][] prev = null;
337       byte[][] current = null;
338       byte[] id;
339       long count = 0;
340       int i;
341       HTable table;
342       long numNodes;
343       long wrap;
344       int width;
345 
346       @Override
347       protected void setup(Context context) throws IOException, InterruptedException {
348         id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
349         Configuration conf = context.getConfiguration();
350         instantiateHTable(conf);
351         this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
352         current = new byte[this.width][];
353         int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
354         this.wrap = (long)wrapMultiplier * width;
355         this.numNodes = context.getConfiguration().getLong(
356             GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
357         if (this.numNodes < this.wrap) {
358           this.wrap = this.numNodes;
359         }
360       }
361 
362       protected void instantiateHTable(Configuration conf) throws IOException {
363         table = new HTable(conf, getTableName(conf));
364         table.setAutoFlush(false, true);
365         table.setWriteBufferSize(4 * 1024 * 1024);
366       }
367 
368       @Override
369       protected void cleanup(Context context) throws IOException ,InterruptedException {
370         table.close();
371       }
372 
373       @Override
374       protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
375         current[i] = new byte[key.getLength()];
376         System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
377         if (++i == current.length) {
378           persist(output, count, prev, current, id);
379           i = 0;
380 
381           if (first == null)
382             first = current;
383           prev = current;
384           current = new byte[this.width][];
385 
386           count += current.length;
387           output.setStatus("Count " + count);
388 
389           if (count % wrap == 0) {
390             // this block of code turns the 1 million linked list of length 25 into one giant
391             //circular linked list of 25 million
392             circularLeftShift(first);
393 
394             persist(output, -1, prev, first, null);
395 
396             first = null;
397             prev = null;
398           }
399         }
400       }
401 
402       private static <T> void circularLeftShift(T[] first) {
403         T ez = first[0];
404         System.arraycopy(first, 1, first, 0, first.length - 1);
405         first[first.length - 1] = ez;
406       }
407 
408       protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
409           throws IOException {
410         for (int i = 0; i < current.length; i++) {
411           Put put = new Put(current[i]);
412           put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
413 
414           if (count >= 0) {
415             put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
416           }
417           if (id != null) {
418             put.add(FAMILY_NAME, COLUMN_CLIENT, id);
419           }
420           table.put(put);
421 
422           if (i % 1000 == 0) {
423             // Tickle progress every so often else maprunner will think us hung
424             output.progress();
425           }
426         }
427 
428         table.flushCommits();
429       }
430     }
431 
432     @Override
433     public int run(String[] args) throws Exception {
434       if (args.length < 3) {
435         System.out.println("Usage : " + Generator.class.getSimpleName() +
436             " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
437         System.out.println("   where <num nodes per map> should be a multiple of " +
438             " width*wrap multiplier, 25M by default");
439         return 0;
440       }
441 
442       int numMappers = Integer.parseInt(args[0]);
443       long numNodes = Long.parseLong(args[1]);
444       Path tmpOutput = new Path(args[2]);
445       Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
446       Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
447       return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
448     }
449 
450     protected void createSchema() throws IOException {
451       Configuration conf = getConf();
452       HBaseAdmin admin = new HBaseAdmin(conf);
453       TableName tableName = getTableName(conf);
454       try {
455         if (!admin.tableExists(tableName)) {
456           HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
457           htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
458           int numberOfServers = admin.getClusterStatus().getServers().size();
459           if (numberOfServers == 0) {
460             throw new IllegalStateException("No live regionservers");
461           }
462           int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
463                                 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
464           int totalNumberOfRegions = numberOfServers * regionsPerServer;
465           LOG.info("Number of live regionservers: " + numberOfServers + ", " +
466               "pre-splitting table into " + totalNumberOfRegions + " regions " +
467               "(default regions per server: " + regionsPerServer + ")");
468 
469           byte[][] splits = new RegionSplitter.UniformSplit().split(
470               totalNumberOfRegions);
471 
472           admin.createTable(htd, splits);
473         }
474       } catch (MasterNotRunningException e) {
475         LOG.error("Master not running", e);
476         throw new IOException(e);
477       } finally {
478         admin.close();
479       }
480     }
481 
482     public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
483         Integer width, Integer wrapMuplitplier) throws Exception {
484       LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
485           + ", numNodes=" + numNodes);
486       Job job = new Job(getConf());
487 
488       job.setJobName("Random Input Generator");
489       job.setNumReduceTasks(0);
490       job.setJarByClass(getClass());
491 
492       job.setInputFormatClass(GeneratorInputFormat.class);
493       job.setOutputKeyClass(BytesWritable.class);
494       job.setOutputValueClass(NullWritable.class);
495 
496       setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
497 
498       job.setMapperClass(Mapper.class); //identity mapper
499 
500       FileOutputFormat.setOutputPath(job, tmpOutput);
501       job.setOutputFormatClass(SequenceFileOutputFormat.class);
502 
503       boolean success = jobCompletion(job);
504 
505       return success ? 0 : 1;
506     }
507 
508     public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
509         Integer width, Integer wrapMuplitplier) throws Exception {
510       LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
511       createSchema();
512       Job job = new Job(getConf());
513 
514       job.setJobName("Link Generator");
515       job.setNumReduceTasks(0);
516       job.setJarByClass(getClass());
517 
518       FileInputFormat.setInputPaths(job, tmpOutput);
519       job.setInputFormatClass(OneFilePerMapperSFIF.class);
520       job.setOutputKeyClass(NullWritable.class);
521       job.setOutputValueClass(NullWritable.class);
522 
523       setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
524 
525       setMapperForGenerator(job);
526 
527       job.setOutputFormatClass(NullOutputFormat.class);
528 
529       job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
530       TableMapReduceUtil.addDependencyJars(job);
531       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
532       TableMapReduceUtil.initCredentials(job);
533 
534       boolean success = jobCompletion(job);
535 
536       return success ? 0 : 1;
537     }
538 
539     protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
540         ClassNotFoundException {
541       boolean success = job.waitForCompletion(true);
542       return success;
543     }
544 
545     protected void setMapperForGenerator(Job job) {
546       job.setMapperClass(GeneratorMapper.class);
547     }
548 
549     public int run(int numMappers, long numNodes, Path tmpOutput,
550         Integer width, Integer wrapMuplitplier) throws Exception {
551       int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
552       if (ret > 0) {
553         return ret;
554       }
555       return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
556     }
557   }
558 
559   /**
560    * A Map Reduce job that verifies that the linked lists generated by
561    * {@link Generator} do not have any holes.
562    */
563   static class Verify extends Configured implements Tool {
564 
565     private static final Log LOG = LogFactory.getLog(Verify.class);
566     protected static final BytesWritable DEF = new BytesWritable(NO_KEY);
567 
568     protected Job job;
569 
570     public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
571       private BytesWritable row = new BytesWritable();
572       private BytesWritable ref = new BytesWritable();
573 
574       @Override
575       protected void map(ImmutableBytesWritable key, Result value, Context context)
576           throws IOException ,InterruptedException {
577         byte[] rowKey = key.get();
578         row.set(rowKey, 0, rowKey.length);
579         context.write(row, DEF);
580         byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
581         if (prev != null && prev.length > 0) {
582           ref.set(prev, 0, prev.length);
583           context.write(ref, row);
584         } else {
585           LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
586         }
587       }
588     }
589 
590     public static enum Counts {
591       UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES
592     }
593 
594     public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> {
595       private ArrayList<byte[]> refs = new ArrayList<byte[]>();
596 
597       private AtomicInteger rows = new AtomicInteger(0);
598 
599       @Override
600       public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
601           throws IOException, InterruptedException {
602 
603         int defCount = 0;
604 
605         refs.clear();
606         for (BytesWritable type : values) {
607           if (type.getLength() == DEF.getLength()) {
608             defCount++;
609           } else {
610             byte[] bytes = new byte[type.getLength()];
611             System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
612             refs.add(bytes);
613           }
614         }
615 
616         // TODO check for more than one def, should not happen
617 
618         StringBuilder refsSb = null;
619         String keyString = null;
620         if (defCount == 0 || refs.size() != 1) {
621           refsSb = new StringBuilder();
622           String comma = "";
623           for (byte[] ref : refs) {
624             refsSb.append(comma);
625             comma = ",";
626             refsSb.append(Bytes.toStringBinary(ref));
627           }
628           keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
629 
630           LOG.error("Linked List error: Key = " + keyString + " References = " + refsSb.toString());
631         }
632 
633         if (defCount == 0 && refs.size() > 0) {
634           // this is bad, found a node that is referenced but not defined. It must have been
635           // lost, emit some info about this node for debugging purposes.
636           context.write(new Text(keyString), new Text(refsSb.toString()));
637           context.getCounter(Counts.UNDEFINED).increment(1);
638           if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
639             context.getCounter("undef", keyString).increment(1);
640           }
641         } else if (defCount > 0 && refs.size() == 0) {
642           // node is defined but not referenced
643           context.write(new Text(keyString), new Text("none"));
644           context.getCounter(Counts.UNREFERENCED).increment(1);
645           if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
646             context.getCounter("unref", keyString).increment(1);
647           }
648         } else {
649           if (refs.size() > 1) {
650             if (refsSb != null) {
651               context.write(new Text(keyString), new Text(refsSb.toString()));
652             }
653             context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
654           }
655           // node is defined and referenced
656           context.getCounter(Counts.REFERENCED).increment(1);
657         }
658 
659       }
660     }
661 
662     @Override
663     public int run(String[] args) throws Exception {
664 
665       if (args.length != 2) {
666         System.out.println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
667         return 0;
668       }
669 
670       String outputDir = args[0];
671       int numReducers = Integer.parseInt(args[1]);
672 
673        return run(outputDir, numReducers);
674     }
675 
676     public int run(String outputDir, int numReducers) throws Exception {
677       return run(new Path(outputDir), numReducers);
678     }
679 
680     public int run(Path outputDir, int numReducers) throws Exception {
681       LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
682 
683       job = new Job(getConf());
684 
685       job.setJobName("Link Verifier");
686       job.setNumReduceTasks(numReducers);
687       job.setJarByClass(getClass());
688 
689       setJobScannerConf(job);
690 
691       Scan scan = new Scan();
692       scan.addColumn(FAMILY_NAME, COLUMN_PREV);
693       scan.setCaching(10000);
694       scan.setCacheBlocks(false);
695 
696       TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
697           VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
698       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
699 
700       job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
701 
702       job.setReducerClass(VerifyReducer.class);
703       job.setOutputFormatClass(TextOutputFormat.class);
704       TextOutputFormat.setOutputPath(job, outputDir);
705 
706       boolean success = job.waitForCompletion(true);
707 
708       return success ? 0 : 1;
709     }
710 
711     @SuppressWarnings("deprecation")
712     public boolean verify(long expectedReferenced) throws Exception {
713       if (job == null) {
714         throw new IllegalStateException("You should call run() first");
715       }
716 
717       Counters counters = job.getCounters();
718 
719       Counter referenced = counters.findCounter(Counts.REFERENCED);
720       Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
721       Counter undefined = counters.findCounter(Counts.UNDEFINED);
722       Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
723 
724       boolean success = true;
725       //assert
726       if (expectedReferenced != referenced.getValue()) {
727         LOG.error("Expected referenced count does not match with actual referenced count. " +
728             "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
729         success = false;
730       }
731 
732       if (unreferenced.getValue() > 0) {
733         boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
734         LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
735             + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
736         success = false;
737       }
738 
739       if (undefined.getValue() > 0) {
740         LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
741         success = false;
742       }
743 
744       if (!success) {
745         handleFailure(counters);
746       }
747       return success;
748     }
749 
750     protected void handleFailure(Counters counters) throws IOException {
751       Configuration conf = job.getConfiguration();
752       HConnection conn = HConnectionManager.getConnection(conf);
753       TableName tableName = getTableName(conf);
754       CounterGroup g = counters.getGroup("undef");
755       Iterator<Counter> it = g.iterator();
756       while (it.hasNext()) {
757         String keyString = it.next().getName();
758         byte[] key = Bytes.toBytes(keyString);
759         HRegionLocation loc = conn.relocateRegion(tableName, key);
760         LOG.error("undefined row " + keyString + ", " + loc);
761       }
762       g = counters.getGroup("unref");
763       it = g.iterator();
764       while (it.hasNext()) {
765         String keyString = it.next().getName();
766         byte[] key = Bytes.toBytes(keyString);
767         HRegionLocation loc = conn.relocateRegion(tableName, key);
768         LOG.error("unreferred row " + keyString + ", " + loc);
769       }
770     }
771   }
772 
773   /**
774    * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
775    * adds more data.
776    */
777   static class Loop extends Configured implements Tool {
778 
779     private static final Log LOG = LogFactory.getLog(Loop.class);
780 
781     IntegrationTestBigLinkedList it;
782 
783     protected void runGenerator(int numMappers, long numNodes,
784         String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
785       Path outputPath = new Path(outputDir);
786       UUID uuid = UUID.randomUUID(); //create a random UUID.
787       Path generatorOutput = new Path(outputPath, uuid.toString());
788 
789       Generator generator = new Generator();
790       generator.setConf(getConf());
791       int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
792       if (retCode > 0) {
793         throw new RuntimeException("Generator failed with return code: " + retCode);
794       }
795     }
796 
797     protected void runVerify(String outputDir,
798         int numReducers, long expectedNumNodes) throws Exception {
799       Path outputPath = new Path(outputDir);
800       UUID uuid = UUID.randomUUID(); //create a random UUID.
801       Path iterationOutput = new Path(outputPath, uuid.toString());
802 
803       Verify verify = new Verify();
804       verify.setConf(getConf());
805       int retCode = verify.run(iterationOutput, numReducers);
806       if (retCode > 0) {
807         throw new RuntimeException("Verify.run failed with return code: " + retCode);
808       }
809 
810       if (!verify.verify(expectedNumNodes)) {
811         throw new RuntimeException("Verify.verify failed");
812       }
813 
814       LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
815     }
816 
817     @Override
818     public int run(String[] args) throws Exception {
819       if (args.length < 5) {
820         System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
821         return 1;
822       }
823       LOG.info("Running Loop with args:" + Arrays.deepToString(args));
824 
825       int numIterations = Integer.parseInt(args[0]);
826       int numMappers = Integer.parseInt(args[1]);
827       long numNodes = Long.parseLong(args[2]);
828       String outputDir = args[3];
829       int numReducers = Integer.parseInt(args[4]);
830       Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
831       Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
832 
833       long expectedNumNodes = 0;
834 
835       if (numIterations < 0) {
836         numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
837       }
838 
839       for (int i = 0; i < numIterations; i++) {
840         LOG.info("Starting iteration = " + i);
841         runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
842         expectedNumNodes += numMappers * numNodes;
843 
844         runVerify(outputDir, numReducers, expectedNumNodes);
845       }
846 
847       return 0;
848     }
849   }
850 
851   /**
852    * A stand alone program that prints out portions of a list created by {@link Generator}
853    */
854   private static class Print extends Configured implements Tool {
855     @Override
856     public int run(String[] args) throws Exception {
857       Options options = new Options();
858       options.addOption("s", "start", true, "start key");
859       options.addOption("e", "end", true, "end key");
860       options.addOption("l", "limit", true, "number to print");
861 
862       GnuParser parser = new GnuParser();
863       CommandLine cmd = null;
864       try {
865         cmd = parser.parse(options, args);
866         if (cmd.getArgs().length != 0) {
867           throw new ParseException("Command takes no arguments");
868         }
869       } catch (ParseException e) {
870         System.err.println("Failed to parse command line " + e.getMessage());
871         System.err.println();
872         HelpFormatter formatter = new HelpFormatter();
873         formatter.printHelp(getClass().getSimpleName(), options);
874         System.exit(-1);
875       }
876 
877       HTable table = new HTable(getConf(), getTableName(getConf()));
878 
879       Scan scan = new Scan();
880       scan.setBatch(10000);
881 
882       if (cmd.hasOption("s"))
883         scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
884 
885       if (cmd.hasOption("e"))
886         scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
887 
888       int limit = 0;
889       if (cmd.hasOption("l"))
890         limit = Integer.parseInt(cmd.getOptionValue("l"));
891       else
892         limit = 100;
893 
894       ResultScanner scanner = table.getScanner(scan);
895 
896       CINode node = new CINode();
897       Result result = scanner.next();
898       int count = 0;
899       while (result != null && count++ < limit) {
900         node = getCINode(result, node);
901         System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
902             Bytes.toStringBinary(node.prev), node.count, node.client);
903         result = scanner.next();
904       }
905       scanner.close();
906       table.close();
907 
908       return 0;
909     }
910   }
911 
912   /**
913    * A stand alone program that deletes a single node.
914    */
915   private static class Delete extends Configured implements Tool {
916     @Override
917     public int run(String[] args) throws Exception {
918       if (args.length != 1) {
919         System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
920         return 0;
921       }
922       byte[] val = Bytes.toBytesBinary(args[0]);
923 
924       org.apache.hadoop.hbase.client.Delete delete
925         = new org.apache.hadoop.hbase.client.Delete(val);
926 
927       HTable table = new HTable(getConf(), getTableName(getConf()));
928 
929       table.delete(delete);
930       table.flushCommits();
931       table.close();
932 
933       System.out.println("Delete successful");
934       return 0;
935     }
936   }
937 
938   /**
939    * A stand alone program that follows a linked list created by {@link Generator} and prints timing info.
940    */
941   private static class Walker extends Configured implements Tool {
942     @Override
943     public int run(String[] args) throws IOException {
944       Options options = new Options();
945       options.addOption("n", "num", true, "number of queries");
946       options.addOption("s", "start", true, "key to start at, binary string");
947       options.addOption("l", "logevery", true, "log every N queries");
948 
949       GnuParser parser = new GnuParser();
950       CommandLine cmd = null;
951       try {
952         cmd = parser.parse(options, args);
953         if (cmd.getArgs().length != 0) {
954           throw new ParseException("Command takes no arguments");
955         }
956       } catch (ParseException e) {
957         System.err.println("Failed to parse command line " + e.getMessage());
958         System.err.println();
959         HelpFormatter formatter = new HelpFormatter();
960         formatter.printHelp(getClass().getSimpleName(), options);
961         System.exit(-1);
962       }
963 
964       long maxQueries = Long.MAX_VALUE;
965       if (cmd.hasOption('n')) {
966         maxQueries = Long.parseLong(cmd.getOptionValue("n"));
967       }
968       Random rand = new Random();
969       boolean isSpecificStart = cmd.hasOption('s');
970       byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
971       int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
972 
973       HTable table = new HTable(getConf(), getTableName(getConf()));
974       long numQueries = 0;
975       // If isSpecificStart is set, only walk one list from that particular node.
976       // Note that in case of circular (or P-shaped) list it will walk forever, as is
977       // the case in normal run without startKey.
978       while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
979         if (!isSpecificStart) {
980           startKey = new byte[ROWKEY_LENGTH];
981           rand.nextBytes(startKey);
982         }
983         CINode node = findStartNode(table, startKey);
984         if (node == null && isSpecificStart) {
985           System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
986         }
987         numQueries++;
988         while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
989           byte[] prev = node.prev;
990           long t1 = System.currentTimeMillis();
991           node = getNode(prev, table, node);
992           long t2 = System.currentTimeMillis();
993           if (numQueries % logEvery == 0) {
994             System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
995           }
996           numQueries++;
997           if (node == null) {
998             System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
999           } else if (node.prev.length == NO_KEY.length) {
1000             System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1001           }
1002         }
1003       }
1004 
1005       table.close();
1006       return 0;
1007     }
1008 
1009     private static CINode findStartNode(HTable table, byte[] startKey) throws IOException {
1010       Scan scan = new Scan();
1011       scan.setStartRow(startKey);
1012       scan.setBatch(1);
1013       scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1014 
1015       long t1 = System.currentTimeMillis();
1016       ResultScanner scanner = table.getScanner(scan);
1017       Result result = scanner.next();
1018       long t2 = System.currentTimeMillis();
1019       scanner.close();
1020 
1021       if ( result != null) {
1022         CINode node = getCINode(result, new CINode());
1023         System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1024         return node;
1025       }
1026 
1027       System.out.println("FSR " + (t2 - t1));
1028 
1029       return null;
1030     }
1031 
1032     private CINode getNode(byte[] row, HTable table, CINode node) throws IOException {
1033       Get get = new Get(row);
1034       get.addColumn(FAMILY_NAME, COLUMN_PREV);
1035       Result result = table.get(get);
1036       return getCINode(result, node);
1037     }
1038   }
1039 
1040   private static class Clean extends Configured implements Tool {
1041 
1042     @Override public int run(String[] args) throws Exception {
1043       if (args.length < 1) {
1044         System.err.println("Usage: Clean <output dir>");
1045         return -1;
1046       }
1047 
1048       Path p = new Path(args[0]);
1049       Configuration conf = getConf();
1050       TableName tableName = getTableName(conf);
1051 
1052       FileSystem fs = HFileSystem.get(conf);
1053       HBaseAdmin admin = new HBaseAdmin(conf);
1054       try {
1055         if (admin.tableExists(tableName)) {
1056           admin.disableTable(tableName);
1057           admin.deleteTable(tableName);
1058         }
1059       } finally {
1060         admin.close();
1061       }
1062 
1063       if (fs.exists(p)) {
1064         fs.delete(p, true);
1065       }
1066 
1067       return 0;
1068     }
1069   }
1070 
1071   static TableName getTableName(Configuration conf) {
1072     return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1073   }
1074 
1075   private static CINode getCINode(Result result, CINode node) {
1076     node.key = Bytes.copy(result.getRow());
1077     if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1078       node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1079     } else {
1080       node.prev = NO_KEY;
1081     }
1082     if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1083       node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1084     } else {
1085       node.count = -1;
1086     }
1087     if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1088       node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1089     } else {
1090       node.client = "";
1091     }
1092     return node;
1093   }
1094 
1095   protected IntegrationTestingUtility util;
1096 
1097   @Override
1098   public void setUpCluster() throws Exception {
1099     util = getTestingUtil(getConf());
1100     boolean isDistributed = util.isDistributedCluster();
1101     util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1102     if (!isDistributed) {
1103       util.startMiniMapReduceCluster();
1104     }
1105     this.setConf(util.getConfiguration());
1106   }
1107 
1108   @Override
1109   public void cleanUpCluster() throws Exception {
1110     super.cleanUpCluster();
1111     if (util.isDistributedCluster()) {
1112       util.shutdownMiniMapReduceCluster();
1113     }
1114   }
1115 
1116   @Test
1117   public void testContinuousIngest() throws IOException, Exception {
1118     //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1119     int ret = ToolRunner.run(getTestingUtil(getConf()).getConfiguration(), new Loop(),
1120         new String[] {"1", "1", "2000000",
1121                      util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1"});
1122     org.junit.Assert.assertEquals(0, ret);
1123   }
1124 
1125   private void usage() {
1126     System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1127     printCommands();
1128   }
1129 
1130   private void printCommands() {
1131     System.err.println("Commands:");
1132     System.err.println(" Generator  Map only job that generates data.");
1133     System.err.println(" Verify     A map reduce job that looks for holes. Look at the counts ");
1134     System.err.println("            after running. See REFERENCED and UNREFERENCED are ok. Any ");
1135     System.err.println("            UNDEFINED counts are bad. Do not run with the Generator.");
1136     System.err.println(" Walker     " +
1137       "Standalong program that starts following a linked list & emits timing info.");
1138     System.err.println(" Print      Standalone program that prints nodes in the linked list.");
1139     System.err.println(" Delete     Standalone program that deletes a·single node.");
1140     System.err.println(" Loop       Program to Loop through Generator and Verify steps");
1141     System.err.println(" Clean      Program to clean all left over detritus.");
1142     System.err.flush();
1143   }
1144 
1145   @Override
1146   protected void processOptions(CommandLine cmd) {
1147     super.processOptions(cmd);
1148     String[] args = cmd.getArgs();
1149     //get the class, run with the conf
1150     if (args.length < 1) {
1151       printUsage(this.getClass().getSimpleName() +
1152         " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1153       printCommands();
1154       throw new RuntimeException("Incorrect Number of args.");
1155     }
1156     toRun = args[0];
1157     otherArgs = Arrays.copyOfRange(args, 1, args.length);
1158   }
1159 
1160   @Override
1161   public int runTestFromCommandLine() throws Exception {
1162 
1163     Tool tool = null;
1164     if (toRun.equals("Generator")) {
1165       tool = new Generator();
1166     } else if (toRun.equalsIgnoreCase("Verify")) {
1167       tool = new Verify();
1168     } else if (toRun.equalsIgnoreCase("Loop")) {
1169       Loop loop = new Loop();
1170       loop.it = this;
1171       tool = loop;
1172     } else if (toRun.equalsIgnoreCase("Walker")) {
1173       tool = new Walker();
1174     } else if (toRun.equalsIgnoreCase("Print")) {
1175       tool = new Print();
1176     } else if (toRun.equalsIgnoreCase("Delete")) {
1177       tool = new Delete();
1178     } else if (toRun.equalsIgnoreCase("Clean")) {
1179       tool = new Clean();
1180     } else {
1181       usage();
1182       throw new RuntimeException("Unknown arg");
1183     }
1184 
1185     return ToolRunner.run(getConf(), tool, otherArgs);
1186   }
1187 
1188   @Override
1189   public String getTablename() {
1190     Configuration c = getConf();
1191     return c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME);
1192   }
1193 
1194   @Override
1195   protected Set<String> getColumnFamilies() {
1196     return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1197   }
1198 
1199   private static void setJobConf(Job job, int numMappers, long numNodes,
1200       Integer width, Integer wrapMultiplier) {
1201     job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1202     job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1203     if (width != null) {
1204       job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1205     }
1206     if (wrapMultiplier != null) {
1207       job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1208     }
1209   }
1210 
1211   public static void setJobScannerConf(Job job) {
1212     // Make sure scanners log something useful to make debugging possible.
1213     job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1214     job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1215   }
1216 
1217   public static void main(String[] args) throws Exception {
1218     Configuration conf = HBaseConfiguration.create();
1219     IntegrationTestingUtility.setUseDistributedCluster(conf);
1220     int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1221     System.exit(ret);
1222   }
1223 }