View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.util;
18  
19  import java.io.IOException;
20  import java.util.HashSet;
21  import java.util.Set;
22  import java.util.concurrent.atomic.AtomicLong;
23  
24  import org.apache.commons.lang.math.RandomUtils;
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HRegionLocation;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Get;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.client.Result;
33  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
34  
35  /** Creates multiple threads that read and verify previously written data */
36  public class MultiThreadedReader extends MultiThreadedAction
37  {
38    private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
39  
40    protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
41    private final double verifyPercent;
42    protected volatile boolean aborted;
43  
44    protected MultiThreadedWriterBase writer = null;
45  
46    /**
47     * The number of keys verified in a sequence. This will never be larger than
48     * the total number of keys in the range. The reader might also verify
49     * random keys when it catches up with the writer.
50     */
51    private final AtomicLong numUniqueKeysVerified = new AtomicLong();
52  
53    /**
54     * Default maximum number of read errors to tolerate before shutting down all
55     * readers.
56     */
57    public static final int DEFAULT_MAX_ERRORS = 10;
58  
59    /**
60     * Default "window" size between the last key written by the writer and the
61     * key that we attempt to read. The lower this number, the stricter our
62     * testing is. If this is zero, we always attempt to read the highest key
63     * in the contiguous sequence of keys written by the writers.
64     */
65    public static final int DEFAULT_KEY_WINDOW = 0;
66  
67    protected AtomicLong numKeysVerified = new AtomicLong(0);
68    protected AtomicLong numReadErrors = new AtomicLong(0);
69    protected AtomicLong numReadFailures = new AtomicLong(0);
70    protected AtomicLong nullResult = new AtomicLong(0);
71  
72    private int maxErrors = DEFAULT_MAX_ERRORS;
73    private int keyWindow = DEFAULT_KEY_WINDOW;
74  
75    public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
76        TableName tableName, double verifyPercent) {
77      super(dataGen, conf, tableName, "R");
78      this.verifyPercent = verifyPercent;
79    }
80  
81    public void linkToWriter(MultiThreadedWriterBase writer) {
82      this.writer = writer;
83      writer.setTrackWroteKeys(true);
84    }
85  
86    public void setMaxErrors(int maxErrors) {
87      this.maxErrors = maxErrors;
88    }
89  
90    public void setKeyWindow(int keyWindow) {
91      this.keyWindow = keyWindow;
92    }
93  
94    @Override
95    public void start(long startKey, long endKey, int numThreads) throws IOException {
96      super.start(startKey, endKey, numThreads);
97      if (verbose) {
98        LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
99      }
100 
101     addReaderThreads(numThreads);
102     startThreads(readers);
103   }
104 
105   protected void addReaderThreads(int numThreads) throws IOException {
106     for (int i = 0; i < numThreads; ++i) {
107       HBaseReaderThread reader = createReaderThread(i);
108       readers.add(reader);
109     }
110   }
111 
112   protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
113     return new HBaseReaderThread(readerId);
114   }
115 
116   public class HBaseReaderThread extends Thread {
117     protected final int readerId;
118     protected final HTable table;
119 
120     /** The "current" key being read. Increases from startKey to endKey. */
121     private long curKey;
122 
123     /** Time when the thread started */
124     protected long startTimeMs;
125 
126     /** If we are ahead of the writer and reading a random key. */
127     private boolean readingRandomKey;
128 
129     private boolean printExceptionTrace = true;
130 
131     /**
132      * @param readerId only the keys with this remainder from division by
133      *          {@link #numThreads} will be read by this thread
134      */
135     public HBaseReaderThread(int readerId) throws IOException {
136       this.readerId = readerId;
137       table = createTable();
138       setName(getClass().getSimpleName() + "_" + readerId);
139     }
140 
141     protected HTable createTable() throws IOException {
142       return new HTable(conf, tableName);
143     }
144 
145     @Override
146     public void run() {
147       try {
148         runReader();
149       } finally {
150         closeTable();
151         numThreadsWorking.decrementAndGet();
152       }
153     }
154 
155     protected void closeTable() {
156       try {
157         if (table != null) {
158           table.close();
159         }
160       } catch (IOException e) {
161         LOG.error("Error closing table", e);
162       }
163     }
164 
165     private void runReader() {
166       if (verbose) {
167         LOG.info("Started thread #" + readerId + " for reads...");
168       }
169 
170       startTimeMs = System.currentTimeMillis();
171       curKey = startKey;
172       while (curKey < endKey && !aborted) {
173         long k = getNextKeyToRead();
174 
175         // A sanity check for the key range.
176         if (k < startKey || k >= endKey) {
177           numReadErrors.incrementAndGet();
178           throw new AssertionError("Load tester logic error: proposed key " +
179               "to read " + k + " is out of range (startKey=" + startKey +
180               ", endKey=" + endKey + ")");
181         }
182 
183         if (k % numThreads != readerId ||
184             writer != null && writer.failedToWriteKey(k)) {
185           // Skip keys that this thread should not read, as well as the keys
186           // that we know the writer failed to write.
187           continue;
188         }
189 
190         readKey(k);
191         if (k == curKey - 1 && !readingRandomKey) {
192           // We have verified another unique key.
193           numUniqueKeysVerified.incrementAndGet();
194         }
195       }
196     }
197 
198     /**
199      * Should only be used for the concurrent writer/reader workload. The
200      * maximum key we are allowed to read, subject to the "key window"
201      * constraint.
202      */
203     private long maxKeyWeCanRead() {
204       long insertedUpToKey = writer.wroteUpToKey();
205       if (insertedUpToKey >= endKey - 1) {
206         // The writer has finished writing our range, so we can read any
207         // key in the range.
208         return endKey - 1;
209       }
210       return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
211     }
212 
213     protected long getNextKeyToRead() {
214       readingRandomKey = false;
215       if (writer == null || curKey <= maxKeyWeCanRead()) {
216         return curKey++;
217       }
218 
219       // We caught up with the writer. See if we can read any keys at all.
220       long maxKeyToRead;
221       while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
222         // The writer has not written sufficient keys for us to be able to read
223         // anything at all. Sleep a bit. This should only happen in the
224         // beginning of a load test run.
225         Threads.sleepWithoutInterrupt(50);
226       }
227 
228       if (curKey <= maxKeyToRead) {
229         // The writer wrote some keys, and we are now allowed to read our
230         // current key.
231         return curKey++;
232       }
233 
234       // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
235       // Don't increment the current key -- we still have to try reading it
236       // later. Set a flag to make sure that we don't count this key towards
237       // the set of unique keys we have verified.
238       readingRandomKey = true;
239       return startKey + Math.abs(RandomUtils.nextLong())
240           % (maxKeyToRead - startKey + 1);
241     }
242 
243     private Get readKey(long keyToRead) {
244       Get get = null;
245       try {
246         get = createGet(keyToRead);
247         queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead);
248       } catch (IOException e) {
249         numReadFailures.addAndGet(1);
250         LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
251             + ", time from start: "
252             + (System.currentTimeMillis() - startTimeMs) + " ms");
253         if (printExceptionTrace) {
254           LOG.warn(e);
255           printExceptionTrace = false;
256         }
257       }
258       return get;
259     }
260 
261     protected Get createGet(long keyToRead) throws IOException {
262       Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
263       String cfsString = "";
264       byte[][] columnFamilies = dataGenerator.getColumnFamilies();
265       for (byte[] cf : columnFamilies) {
266         get.addFamily(cf);
267         if (verbose) {
268           if (cfsString.length() > 0) {
269             cfsString += ", ";
270           }
271           cfsString += "[" + Bytes.toStringBinary(cf) + "]";
272         }
273       }
274       get = dataGenerator.beforeGet(keyToRead, get);
275       if (verbose) {
276         LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
277       }
278       return get;
279     }
280 
281     public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
282       String rowKey = Bytes.toString(get.getRow());
283 
284       // read the data
285       long start = System.nanoTime();
286       Result result = table.get(get);
287       long end = System.nanoTime();
288       verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, table, false);
289     }
290 
291     protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano,
292         Result result, HTable table, boolean isNullExpected)
293         throws IOException {
294       totalOpTimeMs.addAndGet(elapsedNano / 1000000);
295       numKeys.addAndGet(1);
296       if (!result.isEmpty()) {
297         if (verify) {
298           numKeysVerified.incrementAndGet();
299         }
300       } else {
301          HRegionLocation hloc = table.getRegionLocation(
302              Bytes.toBytes(rowKey));
303         LOG.info("Key = " + rowKey + ", RegionServer: "
304             + hloc.getHostname());
305         if(isNullExpected) {
306           nullResult.incrementAndGet();
307           LOG.debug("Null result obtained for the key ="+rowKey);
308           return;
309         }
310       }
311       boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
312       long numErrorsAfterThis = 0;
313       if (isOk) {
314         long cols = 0;
315         // Count the columns for reporting purposes.
316         for (byte[] cf : result.getMap().keySet()) {
317           cols += result.getFamilyMap(cf).size();
318         }
319         numCols.addAndGet(cols);
320       } else {
321         if (writer != null) {
322           LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
323         }
324         numErrorsAfterThis = numReadErrors.incrementAndGet();
325       }
326 
327       if (numErrorsAfterThis > maxErrors) {
328         LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
329         aborted = true;
330       }
331     }
332   }
333 
334   public long getNumReadFailures() {
335     return numReadFailures.get();
336   }
337 
338   public long getNumReadErrors() {
339     return numReadErrors.get();
340   }
341 
342   public long getNumKeysVerified() {
343     return numKeysVerified.get();
344   }
345 
346   public long getNumUniqueKeysVerified() {
347     return numUniqueKeysVerified.get();
348   }
349 
350   public long getNullResultsCount() {
351     return nullResult.get();
352   }
353 
354   @Override
355   protected String progressInfo() {
356     StringBuilder sb = new StringBuilder();
357     appendToStatus(sb, "verified", numKeysVerified.get());
358     appendToStatus(sb, "READ FAILURES", numReadFailures.get());
359     appendToStatus(sb, "READ ERRORS", numReadErrors.get());
360     appendToStatus(sb, "NULL RESULT", nullResult.get());
361     return sb.toString();
362   }
363 }