View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.apache.commons.cli.CommandLine;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FSDataInputStream;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.client.HTable;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.client.ResultScanner;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.client.TableSnapshotScanner;
34  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
37  import org.apache.hadoop.hbase.mapreduce.TableMapper;
38  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
39  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.FSUtils;
42  import org.apache.hadoop.io.NullWritable;
43  import org.apache.hadoop.mapreduce.Counters;
44  import org.apache.hadoop.mapreduce.Job;
45  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
46  import org.apache.hadoop.util.StringUtils;
47  import org.apache.hadoop.util.ToolRunner;
48  
49  import com.google.common.base.Stopwatch;
50  
51  /**
52   * A simple performance evaluation tool for single client and MR scans
53   * and snapshot scans.
54   */
55  public class ScanPerformanceEvaluation extends AbstractHBaseTool {
56  
57    private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
58  
59    private String type;
60    private String file;
61    private String tablename;
62    private String snapshotName;
63    private String restoreDir;
64    private String caching;
65  
66    @Override
67    public void setConf(Configuration conf) {
68      super.setConf(conf);
69      Path rootDir;
70      try {
71        rootDir = FSUtils.getRootDir(conf);
72        rootDir.getFileSystem(conf);
73      } catch (IOException ex) {
74        throw new RuntimeException(ex);
75      }
76    }
77  
78    @Override
79    protected void addOptions() {
80      this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
81      this.addOptWithArg("f", "file", "the filename to read from");
82      this.addOptWithArg("tn", "table", "the tablename to read from");
83      this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
84      this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
85      this.addOptWithArg("ch", "caching", "scanner caching value");
86    }
87  
88    @Override
89    protected void processOptions(CommandLine cmd) {
90      type = cmd.getOptionValue("type");
91      file = cmd.getOptionValue("file");
92      tablename = cmd.getOptionValue("table");
93      snapshotName = cmd.getOptionValue("snapshot");
94      restoreDir = cmd.getOptionValue("restoredir");
95      caching = cmd.getOptionValue("caching");
96    }
97  
98    protected void testHdfsStreaming(Path filename) throws IOException {
99      byte[] buf = new byte[1024];
100     FileSystem fs = filename.getFileSystem(getConf());
101 
102     // read the file from start to finish
103     Stopwatch fileOpenTimer = new Stopwatch();
104     Stopwatch streamTimer = new Stopwatch();
105 
106     fileOpenTimer.start();
107     FSDataInputStream in = fs.open(filename);
108     fileOpenTimer.stop();
109 
110     long totalBytes = 0;
111     streamTimer.start();
112     while (true) {
113       int read = in.read(buf);
114       if (read < 0) {
115         break;
116       }
117       totalBytes += read;
118     }
119     streamTimer.stop();
120 
121     double throughput = (double)totalBytes / streamTimer.elapsedTime(TimeUnit.SECONDS);
122 
123     System.out.println("HDFS streaming: ");
124     System.out.println("total time to open: " + fileOpenTimer.elapsedMillis() + " ms");
125     System.out.println("total time to read: " + streamTimer.elapsedMillis() + " ms");
126     System.out.println("total bytes: " + totalBytes + " bytes ("
127         + StringUtils.humanReadableInt(totalBytes) + ")");
128     System.out.println("throghput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
129   }
130 
131   private Scan getScan() {
132     Scan scan = new Scan(); // default scan settings
133     scan.setCacheBlocks(false);
134     scan.setMaxVersions(1);
135     scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
136     if (caching != null) {
137       scan.setCaching(Integer.parseInt(caching));
138     }
139 
140     return scan;
141   }
142 
143   public void testScan() throws IOException {
144     Stopwatch tableOpenTimer = new Stopwatch();
145     Stopwatch scanOpenTimer = new Stopwatch();
146     Stopwatch scanTimer = new Stopwatch();
147 
148     tableOpenTimer.start();
149     HTable table = new HTable(getConf(), TableName.valueOf(tablename));
150     tableOpenTimer.stop();
151 
152     Scan scan = getScan();
153     scanOpenTimer.start();
154     ResultScanner scanner = table.getScanner(scan);
155     scanOpenTimer.stop();
156 
157     long numRows = 0;
158     long numCells = 0;
159     scanTimer.start();
160     while (true) {
161       Result result = scanner.next();
162       if (result == null) {
163         break;
164       }
165       numRows++;
166 
167       numCells += result.rawCells().length;
168     }
169     scanTimer.stop();
170     scanner.close();
171     table.close();
172 
173     ScanMetrics metrics = ProtobufUtil.toScanMetrics(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
174     long totalBytes = metrics.countOfBytesInResults.get();
175     double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
176     double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
177     double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
178 
179     System.out.println("HBase scan: ");
180     System.out.println("total time to open table: " + tableOpenTimer.elapsedMillis() + " ms");
181     System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
182     System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
183 
184     System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
185 
186     System.out.println("total bytes: " + totalBytes + " bytes ("
187         + StringUtils.humanReadableInt(totalBytes) + ")");
188     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
189     System.out.println("total rows  : " + numRows);
190     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
191     System.out.println("total cells : " + numCells);
192     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
193   }
194 
195 
196   public void testSnapshotScan() throws IOException {
197     Stopwatch snapshotRestoreTimer = new Stopwatch();
198     Stopwatch scanOpenTimer = new Stopwatch();
199     Stopwatch scanTimer = new Stopwatch();
200 
201     Path restoreDir = new Path(this.restoreDir);
202 
203     snapshotRestoreTimer.start();
204     restoreDir.getFileSystem(conf).delete(restoreDir, true);
205     snapshotRestoreTimer.stop();
206 
207     Scan scan = getScan();
208     scanOpenTimer.start();
209     TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
210     scanOpenTimer.stop();
211 
212     long numRows = 0;
213     long numCells = 0;
214     scanTimer.start();
215     while (true) {
216       Result result = scanner.next();
217       if (result == null) {
218         break;
219       }
220       numRows++;
221 
222       numCells += result.rawCells().length;
223     }
224     scanTimer.stop();
225     scanner.close();
226 
227     ScanMetrics metrics = scanner.getScanMetrics();
228     long totalBytes = metrics.countOfBytesInResults.get();
229     double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
230     double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
231     double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
232 
233     System.out.println("HBase scan snapshot: ");
234     System.out.println("total time to restore snapshot: " + snapshotRestoreTimer.elapsedMillis() + " ms");
235     System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
236     System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
237 
238     System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
239 
240     System.out.println("total bytes: " + totalBytes + " bytes ("
241         + StringUtils.humanReadableInt(totalBytes) + ")");
242     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
243     System.out.println("total rows  : " + numRows);
244     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
245     System.out.println("total cells : " + numCells);
246     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
247 
248   }
249 
250   public static enum ScanCounter {
251     NUM_ROWS,
252     NUM_CELLS,
253   }
254 
255   public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
256     @Override
257     protected void map(ImmutableBytesWritable key, Result value,
258         Context context) throws IOException,
259         InterruptedException {
260       context.getCounter(ScanCounter.NUM_ROWS).increment(1);
261       context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
262     }
263   }
264 
265   public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
266     Stopwatch scanOpenTimer = new Stopwatch();
267     Stopwatch scanTimer = new Stopwatch();
268 
269     Scan scan = getScan();
270 
271     String jobName = "testScanMapReduce";
272 
273     Job job = new Job(conf);
274     job.setJobName(jobName);
275 
276     job.setJarByClass(getClass());
277 
278     TableMapReduceUtil.initTableMapperJob(
279         this.tablename,
280         scan,
281         MyMapper.class,
282         NullWritable.class,
283         NullWritable.class,
284         job
285     );
286 
287     job.setNumReduceTasks(0);
288     job.setOutputKeyClass(NullWritable.class);
289     job.setOutputValueClass(NullWritable.class);
290     job.setOutputFormatClass(NullOutputFormat.class);
291 
292     scanTimer.start();
293     job.waitForCompletion(true);
294     scanTimer.stop();
295 
296     Counters counters = job.getCounters();
297     long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
298     long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
299 
300     long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
301     double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
302     double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
303     double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
304 
305     System.out.println("HBase scan mapreduce: ");
306     System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
307     System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
308 
309     System.out.println("total bytes: " + totalBytes + " bytes ("
310         + StringUtils.humanReadableInt(totalBytes) + ")");
311     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
312     System.out.println("total rows  : " + numRows);
313     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
314     System.out.println("total cells : " + numCells);
315     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
316   }
317 
318   public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
319     Stopwatch scanOpenTimer = new Stopwatch();
320     Stopwatch scanTimer = new Stopwatch();
321 
322     Scan scan = getScan();
323 
324     String jobName = "testSnapshotScanMapReduce";
325 
326     Job job = new Job(conf);
327     job.setJobName(jobName);
328 
329     job.setJarByClass(getClass());
330 
331     TableMapReduceUtil.initTableSnapshotMapperJob(
332         this.snapshotName,
333         scan,
334         MyMapper.class,
335         NullWritable.class,
336         NullWritable.class,
337         job,
338         true,
339         new Path(restoreDir)
340     );
341 
342     job.setNumReduceTasks(0);
343     job.setOutputKeyClass(NullWritable.class);
344     job.setOutputValueClass(NullWritable.class);
345     job.setOutputFormatClass(NullOutputFormat.class);
346 
347     scanTimer.start();
348     job.waitForCompletion(true);
349     scanTimer.stop();
350 
351     Counters counters = job.getCounters();
352     long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
353     long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
354 
355     long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
356     double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
357     double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
358     double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
359 
360     System.out.println("HBase scan mapreduce: ");
361     System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
362     System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
363 
364     System.out.println("total bytes: " + totalBytes + " bytes ("
365         + StringUtils.humanReadableInt(totalBytes) + ")");
366     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
367     System.out.println("total rows  : " + numRows);
368     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
369     System.out.println("total cells : " + numCells);
370     System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
371   }
372 
373   @Override
374   protected int doWork() throws Exception {
375     if (type.equals("streaming")) {
376       testHdfsStreaming(new Path(file));
377     } else if (type.equals("scan")){
378       testScan();
379     } else if (type.equals("snapshotscan")) {
380       testSnapshotScan();
381     } else if (type.equals("scanmapreduce")) {
382       testScanMapReduce();
383     } else if (type.equals("snapshotscanmapreduce")) {
384       testSnapshotScanMapReduce();
385     }
386     return 0;
387   }
388 
389   public static void main (String[] args) throws Exception {
390     int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
391     System.exit(ret);
392   }
393 }