1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.Random;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.commons.math.random.RandomData;
27 import org.apache.commons.math.random.RandomDataImpl;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32 import org.apache.hadoop.io.MapFile;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.io.WritableComparable;
35
36
37
38
39
40
41 public class MapFilePerformanceEvaluation {
42 protected final Configuration conf;
43 private static final int ROW_LENGTH = 10;
44 private static final int ROW_COUNT = 100000;
45
46 static final Log LOG =
47 LogFactory.getLog(MapFilePerformanceEvaluation.class.getName());
48
49
50
51
52 public MapFilePerformanceEvaluation(final Configuration c) {
53 super();
54 this.conf = c;
55 }
56
57 static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
58 String v = Integer.toString(i);
59 w.set(Bytes.toBytes("0000000000".substring(v.length()) + v));
60 return w;
61 }
62
63 private void runBenchmarks() throws Exception {
64 final FileSystem fs = FileSystem.get(this.conf);
65 final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
66 if (fs.exists(mf)) {
67 fs.delete(mf, true);
68 }
69 runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT),
70 ROW_COUNT);
71
72 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
73 public void run() {
74 try {
75 runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
76 ROW_COUNT);
77 } catch (Exception e) {
78 e.printStackTrace();
79 }
80 }
81 });
82 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
83 public void run() {
84 try {
85 runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
86 ROW_COUNT);
87 } catch (Exception e) {
88 e.printStackTrace();
89 }
90 }
91 });
92 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
93 public void run() {
94 try {
95 runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
96 ROW_COUNT);
97 } catch (Exception e) {
98 e.printStackTrace();
99 }
100 }
101 });
102 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
103 public void run() {
104 try {
105 runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
106 ROW_COUNT);
107 } catch (Exception e) {
108 e.printStackTrace();
109 }
110 }
111 });
112 }
113
114 protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount)
115 throws Exception {
116 LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
117 rowCount + " rows.");
118 long elapsedTime = benchmark.run();
119 LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
120 rowCount + " rows took " + elapsedTime + "ms.");
121 }
122
123 static abstract class RowOrientedBenchmark {
124
125 protected final Configuration conf;
126 protected final FileSystem fs;
127 protected final Path mf;
128 protected final int totalRows;
129
130 public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
131 int totalRows) {
132 this.conf = conf;
133 this.fs = fs;
134 this.mf = mf;
135 this.totalRows = totalRows;
136 }
137
138 void setUp() throws Exception {
139
140 }
141
142 abstract void doRow(int i) throws Exception;
143
144 protected int getReportingPeriod() {
145 return this.totalRows / 10;
146 }
147
148 void tearDown() throws Exception {
149
150 }
151
152
153
154
155
156
157 long run() throws Exception {
158 long elapsedTime;
159 setUp();
160 long startTime = System.currentTimeMillis();
161 try {
162 for (int i = 0; i < totalRows; i++) {
163 if (i > 0 && i % getReportingPeriod() == 0) {
164 LOG.info("Processed " + i + " rows.");
165 }
166 doRow(i);
167 }
168 elapsedTime = System.currentTimeMillis() - startTime;
169 } finally {
170 tearDown();
171 }
172 return elapsedTime;
173 }
174
175 }
176
177 static class SequentialWriteBenchmark extends RowOrientedBenchmark {
178
179 protected MapFile.Writer writer;
180 private Random random = new Random();
181 private byte[] bytes = new byte[ROW_LENGTH];
182 private ImmutableBytesWritable key = new ImmutableBytesWritable();
183 private ImmutableBytesWritable value = new ImmutableBytesWritable();
184
185 public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
186 int totalRows) {
187 super(conf, fs, mf, totalRows);
188 }
189
190 @Override
191 void setUp() throws Exception {
192 writer = new MapFile.Writer(conf, fs, mf.toString(),
193 ImmutableBytesWritable.class, ImmutableBytesWritable.class);
194 }
195
196 @Override
197 void doRow(int i) throws Exception {
198 value.set(generateValue());
199 writer.append(format(i, key), value);
200 }
201
202 private byte[] generateValue() {
203 random.nextBytes(bytes);
204 return bytes;
205 }
206
207 @Override
208 protected int getReportingPeriod() {
209 return this.totalRows;
210 }
211
212 @Override
213 void tearDown() throws Exception {
214 writer.close();
215 }
216
217 }
218
219 static abstract class ReadBenchmark extends RowOrientedBenchmark {
220 ImmutableBytesWritable key = new ImmutableBytesWritable();
221 ImmutableBytesWritable value = new ImmutableBytesWritable();
222
223 protected MapFile.Reader reader;
224
225 public ReadBenchmark(Configuration conf, FileSystem fs, Path mf,
226 int totalRows) {
227 super(conf, fs, mf, totalRows);
228 }
229
230 @Override
231 void setUp() throws Exception {
232 reader = new MapFile.Reader(fs, mf.toString(), conf);
233 }
234
235 @Override
236 void tearDown() throws Exception {
237 reader.close();
238 }
239
240 }
241
242 static class SequentialReadBenchmark extends ReadBenchmark {
243 ImmutableBytesWritable verify = new ImmutableBytesWritable();
244
245 public SequentialReadBenchmark(Configuration conf, FileSystem fs,
246 Path mf, int totalRows) {
247 super(conf, fs, mf, totalRows);
248 }
249
250 @Override
251 void doRow(int i) throws Exception {
252 this.reader.next(key, value);
253 PerformanceEvaluationCommons.assertKey(this.key.get(),
254 format(i, this.verify).get());
255 PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, value.getLength());
256 }
257
258 @Override
259 protected int getReportingPeriod() {
260 return this.totalRows;
261 }
262
263 }
264
265 static class UniformRandomReadBenchmark extends ReadBenchmark {
266
267 private Random random = new Random();
268
269 public UniformRandomReadBenchmark(Configuration conf, FileSystem fs,
270 Path mf, int totalRows) {
271 super(conf, fs, mf, totalRows);
272 }
273
274 @Override
275 void doRow(int i) throws Exception {
276 ImmutableBytesWritable k = getRandomRow();
277 ImmutableBytesWritable r = (ImmutableBytesWritable)reader.get(k, value);
278 PerformanceEvaluationCommons.assertValueSize(r.getLength(), ROW_LENGTH);
279 }
280
281 private ImmutableBytesWritable getRandomRow() {
282 return format(random.nextInt(totalRows), key);
283 }
284
285 }
286
287 static class UniformRandomSmallScan extends ReadBenchmark {
288 private Random random = new Random();
289
290 public UniformRandomSmallScan(Configuration conf, FileSystem fs,
291 Path mf, int totalRows) {
292 super(conf, fs, mf, totalRows/10);
293 }
294
295 @Override
296 void doRow(int i) throws Exception {
297 ImmutableBytesWritable ibw = getRandomRow();
298 WritableComparable<?> wc = this.reader.getClosest(ibw, this.value);
299 if (wc == null) {
300 throw new NullPointerException();
301 }
302 PerformanceEvaluationCommons.assertKey(ibw.get(),
303 ((ImmutableBytesWritable)wc).get());
304
305 for (int ii = 0; ii < 29; ii++) {
306 this.reader.next(this.key, this.value);
307 PerformanceEvaluationCommons.assertValueSize(this.value.getLength(), ROW_LENGTH);
308 }
309 }
310
311 private ImmutableBytesWritable getRandomRow() {
312 return format(random.nextInt(totalRows), key);
313 }
314 }
315
316 static class GaussianRandomReadBenchmark extends ReadBenchmark {
317 private RandomData randomData = new RandomDataImpl();
318
319 public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs,
320 Path mf, int totalRows) {
321 super(conf, fs, mf, totalRows);
322 }
323
324 @Override
325 void doRow(int i) throws Exception {
326 ImmutableBytesWritable k = getGaussianRandomRow();
327 ImmutableBytesWritable r = (ImmutableBytesWritable)reader.get(k, value);
328 PerformanceEvaluationCommons.assertValueSize(r.getLength(), ROW_LENGTH);
329 }
330
331 private ImmutableBytesWritable getGaussianRandomRow() {
332 int r = (int) randomData.nextGaussian((double)totalRows / 2.0,
333 (double)totalRows / 10.0);
334 return format(r, key);
335 }
336
337 }
338
339
340
341
342
343
344 public static void main(String[] args) throws Exception {
345 new MapFilePerformanceEvaluation(HBaseConfiguration.create()).
346 runBenchmarks();
347 }
348 }