1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Matchers.anyObject;
25 import static org.mockito.Mockito.doAnswer;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.spy;
30
31 import java.io.IOException;
32 import java.util.Arrays;
33 import java.util.Map;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.*;
38 import org.apache.hadoop.hbase.client.HTable;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.ResultScanner;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
44 import org.apache.hadoop.hbase.filter.Filter;
45 import org.apache.hadoop.hbase.filter.RegexStringComparator;
46 import org.apache.hadoop.hbase.filter.RowFilter;
47 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
48 import org.apache.hadoop.hbase.testclassification.LargeTests;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.io.NullWritable;
51 import org.apache.hadoop.mapred.JobConf;
52 import org.apache.hadoop.mapred.JobConfigurable;
53 import org.apache.hadoop.mapred.MiniMRCluster;
54 import org.apache.hadoop.mapreduce.Job;
55 import org.apache.hadoop.mapreduce.Mapper.Context;
56 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
57 import org.junit.AfterClass;
58 import org.junit.Before;
59 import org.junit.BeforeClass;
60 import org.junit.Test;
61 import org.junit.experimental.categories.Category;
62 import org.mockito.invocation.InvocationOnMock;
63 import org.mockito.stubbing.Answer;
64
65
66
67
68
69 @Category(LargeTests.class)
70 public class TestTableInputFormat {
71
72 private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
73
74 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
75 private static MiniMRCluster mrCluster;
76 static final byte[] FAMILY = Bytes.toBytes("family");
77
78 private static final byte[][] columns = new byte[][] { FAMILY };
79
80 @BeforeClass
81 public static void beforeClass() throws Exception {
82 UTIL.startMiniCluster();
83 mrCluster = UTIL.startMiniMapReduceCluster();
84 }
85
86 @AfterClass
87 public static void afterClass() throws Exception {
88 UTIL.shutdownMiniMapReduceCluster();
89 UTIL.shutdownMiniCluster();
90 }
91
92 @Before
93 public void before() throws IOException {
94 LOG.info("before");
95 UTIL.ensureSomeRegionServersAvailable(1);
96 LOG.info("before done");
97 }
98
99
100
101
102
103
104
105
106 public static HTable createTable(byte[] tableName) throws IOException {
107 return createTable(tableName, new byte[][] { FAMILY });
108 }
109
110
111
112
113
114
115
116
117 public static HTable createTable(byte[] tableName, byte[][] families) throws IOException {
118 HTable table = UTIL.createTable(tableName, families);
119 Put p = new Put("aaa".getBytes());
120 for (byte[] family : families) {
121 p.add(family, null, "value aaa".getBytes());
122 }
123 table.put(p);
124 p = new Put("bbb".getBytes());
125 for (byte[] family : families) {
126 p.add(family, null, "value bbb".getBytes());
127 }
128 table.put(p);
129 return table;
130 }
131
132
133
134
135
136
137
138
139
140
141 static boolean checkResult(Result r, ImmutableBytesWritable key,
142 byte[] expectedKey, byte[] expectedValue) {
143 assertEquals(0, key.compareTo(expectedKey));
144 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
145 byte[] value = vals.values().iterator().next();
146 assertTrue(Arrays.equals(value, expectedValue));
147 return true;
148 }
149
150
151
152
153
154
155
156
157
158 static void runTestMapreduce(HTable table) throws IOException,
159 InterruptedException {
160 org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
161 new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
162 Scan s = new Scan();
163 s.setStartRow("aaa".getBytes());
164 s.setStopRow("zzz".getBytes());
165 s.addFamily(FAMILY);
166 trr.setScan(s);
167 trr.setHTable(table);
168
169 trr.initialize(null, null);
170 Result r = new Result();
171 ImmutableBytesWritable key = new ImmutableBytesWritable();
172
173 boolean more = trr.nextKeyValue();
174 assertTrue(more);
175 key = trr.getCurrentKey();
176 r = trr.getCurrentValue();
177 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
178
179 more = trr.nextKeyValue();
180 assertTrue(more);
181 key = trr.getCurrentKey();
182 r = trr.getCurrentValue();
183 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
184
185
186 more = trr.nextKeyValue();
187 assertFalse(more);
188 }
189
190
191
192
193
194
195 static HTable createIOEScannerTable(byte[] name, final int failCnt)
196 throws IOException {
197
198 Answer<ResultScanner> a = new Answer<ResultScanner>() {
199 int cnt = 0;
200
201 @Override
202 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
203
204 if (cnt++ < failCnt) {
205
206 Scan scan = mock(Scan.class);
207 doReturn("bogus".getBytes()).when(scan).getStartRow();
208 ResultScanner scanner = mock(ResultScanner.class);
209
210 doThrow(new IOException("Injected exception")).when(scanner).next();
211 return scanner;
212 }
213
214
215 return (ResultScanner) invocation.callRealMethod();
216 }
217 };
218
219 HTable htable = spy(createTable(name));
220 doAnswer(a).when(htable).getScanner((Scan) anyObject());
221 return htable;
222 }
223
224
225
226
227
228
229
230 static HTable createDNRIOEScannerTable(byte[] name, final int failCnt)
231 throws IOException {
232
233 Answer<ResultScanner> a = new Answer<ResultScanner>() {
234 int cnt = 0;
235
236 @Override
237 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
238
239 if (cnt++ < failCnt) {
240
241 Scan scan = mock(Scan.class);
242 doReturn("bogus".getBytes()).when(scan).getStartRow();
243 ResultScanner scanner = mock(ResultScanner.class);
244
245 invocation.callRealMethod();
246 doThrow(
247 new UnknownScannerException("Injected simulated TimeoutException"))
248 .when(scanner).next();
249 return scanner;
250 }
251
252
253 return (ResultScanner) invocation.callRealMethod();
254 }
255 };
256
257 HTable htable = spy(createTable(name));
258 doAnswer(a).when(htable).getScanner((Scan) anyObject());
259 return htable;
260 }
261
262
263
264
265
266
267
268 @Test
269 public void testTableRecordReaderMapreduce() throws IOException,
270 InterruptedException {
271 HTable table = createTable("table1-mr".getBytes());
272 runTestMapreduce(table);
273 }
274
275
276
277
278
279
280
281 @Test
282 public void testTableRecordReaderScannerFailMapreduce() throws IOException,
283 InterruptedException {
284 HTable htable = createIOEScannerTable("table2-mr".getBytes(), 1);
285 runTestMapreduce(htable);
286 }
287
288
289
290
291
292
293
294 @Test(expected = IOException.class)
295 public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException,
296 InterruptedException {
297 HTable htable = createIOEScannerTable("table3-mr".getBytes(), 2);
298 runTestMapreduce(htable);
299 }
300
301
302
303
304
305
306
307
308 @Test
309 public void testTableRecordReaderScannerTimeoutMapreduce()
310 throws IOException, InterruptedException {
311 HTable htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
312 runTestMapreduce(htable);
313 }
314
315
316
317
318
319
320
321
322 @Test(expected = org.apache.hadoop.hbase.DoNotRetryIOException.class)
323 public void testTableRecordReaderScannerTimeoutMapreduceTwice()
324 throws IOException, InterruptedException {
325 HTable htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
326 runTestMapreduce(htable);
327 }
328
329
330
331
332 @Test
333 public void testExtensionOfTableInputFormatBase()
334 throws IOException, InterruptedException, ClassNotFoundException {
335 LOG.info("testing use of an InputFormat taht extends InputFormatBase");
336 final HTable htable = createTable(Bytes.toBytes("exampleTable"),
337 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
338
339 final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
340 job.setInputFormatClass(ExampleTIF.class);
341 job.setOutputFormatClass(NullOutputFormat.class);
342 job.setMapperClass(ExampleVerifier.class);
343 job.setNumReduceTasks(0);
344
345 LOG.debug("submitting job.");
346 assertTrue("job failed!", job.waitForCompletion(true));
347 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
348 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
349 assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
350 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
351 assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
352 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
353 assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
354 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
355 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
356 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
357 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
358 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
359 }
360
361 public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
362
363 @Override
364 public void map(ImmutableBytesWritable key, Result value, Context context)
365 throws IOException {
366 for (Cell cell : value.listCells()) {
367 context.getCounter(TestTableInputFormat.class.getName() + ":row",
368 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
369 .increment(1l);
370 context.getCounter(TestTableInputFormat.class.getName() + ":family",
371 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
372 .increment(1l);
373 context.getCounter(TestTableInputFormat.class.getName() + ":value",
374 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
375 .increment(1l);
376 }
377 }
378
379 }
380
381 public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
382
383 @Override
384 public void configure(JobConf job) {
385 try {
386 HTable exampleTable = new HTable(HBaseConfiguration.create(job),
387 Bytes.toBytes("exampleTable"));
388
389 setHTable(exampleTable);
390 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
391 Bytes.toBytes("columnB") };
392
393 Scan scan = new Scan();
394 for (byte[] family : inputColumns) {
395 scan.addFamily(family);
396 }
397 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
398 scan.setFilter(exampleFilter);
399 setScan(scan);
400 } catch (IOException exception) {
401 throw new RuntimeException("Failed to configure for job.", exception);
402 }
403 }
404
405 }
406
407 }
408