1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Matchers.anyObject;
25 import static org.mockito.Mockito.doAnswer;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.spy;
30
31 import java.io.IOException;
32 import java.util.Arrays;
33 import java.util.Map;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.*;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.client.HTable;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.ResultScanner;
43 import org.apache.hadoop.hbase.client.Scan;
44 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
45 import org.apache.hadoop.hbase.filter.Filter;
46 import org.apache.hadoop.hbase.filter.RegexStringComparator;
47 import org.apache.hadoop.hbase.filter.RowFilter;
48 import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
49 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
50 import org.apache.hadoop.hbase.testclassification.LargeTests;
51 import org.apache.hadoop.hbase.util.Bytes;
52 import org.apache.hadoop.io.Text;
53 import org.apache.hadoop.mapred.JobClient;
54 import org.apache.hadoop.mapred.JobConf;
55 import org.apache.hadoop.mapred.JobConfigurable;
56 import org.apache.hadoop.mapred.MiniMRCluster;
57 import org.apache.hadoop.mapred.OutputCollector;
58 import org.apache.hadoop.mapred.Reporter;
59 import org.apache.hadoop.mapred.RunningJob;
60 import org.apache.hadoop.mapred.lib.NullOutputFormat;
61 import org.junit.AfterClass;
62 import org.junit.Before;
63 import org.junit.BeforeClass;
64 import org.junit.Test;
65 import org.junit.experimental.categories.Category;
66 import org.mockito.invocation.InvocationOnMock;
67 import org.mockito.stubbing.Answer;
68
69
70
71
72
73 @Category(LargeTests.class)
74 public class TestTableInputFormat {
75
76 private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
77
78 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
79 private static MiniMRCluster mrCluster;
80 static final byte[] FAMILY = Bytes.toBytes("family");
81
82 private static final byte[][] columns = new byte[][] { FAMILY };
83
84 @BeforeClass
85 public static void beforeClass() throws Exception {
86 UTIL.startMiniCluster();
87 mrCluster = UTIL.startMiniMapReduceCluster();
88 }
89
90 @AfterClass
91 public static void afterClass() throws Exception {
92 UTIL.shutdownMiniMapReduceCluster();
93 UTIL.shutdownMiniCluster();
94 }
95
96 @Before
97 public void before() throws IOException {
98 LOG.info("before");
99 UTIL.ensureSomeRegionServersAvailable(1);
100 LOG.info("before done");
101 }
102
103
104
105
106
107
108
109
110 public static HTable createTable(byte[] tableName) throws IOException {
111 return createTable(tableName, new byte[][] { FAMILY });
112 }
113
114
115
116
117
118
119
120
121 public static HTable createTable(byte[] tableName, byte[][] families) throws IOException {
122 HTable table = UTIL.createTable(tableName, families);
123 Put p = new Put("aaa".getBytes());
124 for (byte[] family : families) {
125 p.add(family, null, "value aaa".getBytes());
126 }
127 table.put(p);
128 p = new Put("bbb".getBytes());
129 for (byte[] family : families) {
130 p.add(family, null, "value bbb".getBytes());
131 }
132 table.put(p);
133 return table;
134 }
135
136
137
138
139
140
141
142
143
144
145 static boolean checkResult(Result r, ImmutableBytesWritable key,
146 byte[] expectedKey, byte[] expectedValue) {
147 assertEquals(0, key.compareTo(expectedKey));
148 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
149 byte[] value = vals.values().iterator().next();
150 assertTrue(Arrays.equals(value, expectedValue));
151 return true;
152 }
153
154
155
156
157
158
159
160
161 static void runTestMapred(HTable table) throws IOException {
162 org.apache.hadoop.hbase.mapred.TableRecordReader trr =
163 new org.apache.hadoop.hbase.mapred.TableRecordReader();
164 trr.setStartRow("aaa".getBytes());
165 trr.setEndRow("zzz".getBytes());
166 trr.setHTable(table);
167 trr.setInputColumns(columns);
168
169 trr.init();
170 Result r = new Result();
171 ImmutableBytesWritable key = new ImmutableBytesWritable();
172
173 boolean more = trr.next(key, r);
174 assertTrue(more);
175 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
176
177 more = trr.next(key, r);
178 assertTrue(more);
179 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
180
181
182 more = trr.next(key, r);
183 assertFalse(more);
184 }
185
186
187
188
189
190
191 static HTable createIOEScannerTable(byte[] name, final int failCnt)
192 throws IOException {
193
194 Answer<ResultScanner> a = new Answer<ResultScanner>() {
195 int cnt = 0;
196
197 @Override
198 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
199
200 if (cnt++ < failCnt) {
201
202 Scan scan = mock(Scan.class);
203 doReturn("bogus".getBytes()).when(scan).getStartRow();
204 ResultScanner scanner = mock(ResultScanner.class);
205
206 doThrow(new IOException("Injected exception")).when(scanner).next();
207 return scanner;
208 }
209
210
211 return (ResultScanner) invocation.callRealMethod();
212 }
213 };
214
215 HTable htable = spy(createTable(name));
216 doAnswer(a).when(htable).getScanner((Scan) anyObject());
217 return htable;
218 }
219
220
221
222
223
224
225
226 static HTable createDNRIOEScannerTable(byte[] name, final int failCnt)
227 throws IOException {
228
229 Answer<ResultScanner> a = new Answer<ResultScanner>() {
230 int cnt = 0;
231
232 @Override
233 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
234
235 if (cnt++ < failCnt) {
236
237 Scan scan = mock(Scan.class);
238 doReturn("bogus".getBytes()).when(scan).getStartRow();
239 ResultScanner scanner = mock(ResultScanner.class);
240
241 invocation.callRealMethod();
242 doThrow(
243 new UnknownScannerException("Injected simulated TimeoutException"))
244 .when(scanner).next();
245 return scanner;
246 }
247
248
249 return (ResultScanner) invocation.callRealMethod();
250 }
251 };
252
253 HTable htable = spy(createTable(name));
254 doAnswer(a).when(htable).getScanner((Scan) anyObject());
255 return htable;
256 }
257
258
259
260
261
262
263 @Test
264 public void testTableRecordReader() throws IOException {
265 HTable table = createTable("table1".getBytes());
266 runTestMapred(table);
267 }
268
269
270
271
272
273
274 @Test
275 public void testTableRecordReaderScannerFail() throws IOException {
276 HTable htable = createIOEScannerTable("table2".getBytes(), 1);
277 runTestMapred(htable);
278 }
279
280
281
282
283
284
285 @Test(expected = IOException.class)
286 public void testTableRecordReaderScannerFailTwice() throws IOException {
287 HTable htable = createIOEScannerTable("table3".getBytes(), 2);
288 runTestMapred(htable);
289 }
290
291
292
293
294
295
296
297 @Test
298 public void testTableRecordReaderScannerTimeout() throws IOException {
299 HTable htable = createDNRIOEScannerTable("table4".getBytes(), 1);
300 runTestMapred(htable);
301 }
302
303
304
305
306
307
308
309 @Test(expected = org.apache.hadoop.hbase.DoNotRetryIOException.class)
310 public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
311 HTable htable = createDNRIOEScannerTable("table5".getBytes(), 2);
312 runTestMapred(htable);
313 }
314
315
316
317
318 @Test
319 public void testExtensionOfTableInputFormatBase() throws IOException {
320 LOG.info("testing use of an InputFormat taht extends InputFormatBase");
321 final HTable htable = createTable(Bytes.toBytes("exampleTable"),
322 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
323 final JobConf job = MapreduceTestingShim.getJobConf(mrCluster);
324 job.setInputFormat(ExampleTIF.class);
325 job.setOutputFormat(NullOutputFormat.class);
326 job.setMapperClass(ExampleVerifier.class);
327 job.setNumReduceTasks(0);
328 LOG.debug("submitting job.");
329 final RunningJob run = JobClient.runJob(job);
330 assertTrue("job failed!", run.isSuccessful());
331 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
332 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
333 assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
334 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
335 assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
336 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
337 assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
338 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
339 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
340 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
341 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
342 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
343 }
344
345 public static class ExampleVerifier implements TableMap<Text, Text> {
346
347 @Override
348 public void configure(JobConf conf) {
349 }
350
351 @Override
352 public void map(ImmutableBytesWritable key, Result value,
353 OutputCollector<Text,Text> output,
354 Reporter reporter) throws IOException {
355 for (Cell cell : value.listCells()) {
356 reporter.getCounter(TestTableInputFormat.class.getName() + ":row",
357 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
358 .increment(1l);
359 reporter.getCounter(TestTableInputFormat.class.getName() + ":family",
360 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
361 .increment(1l);
362 reporter.getCounter(TestTableInputFormat.class.getName() + ":value",
363 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
364 .increment(1l);
365 }
366 }
367
368 @Override
369 public void close() {
370 }
371
372 }
373
374 public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
375
376 @Override
377 public void configure(JobConf job) {
378 try {
379 HTable exampleTable = new HTable(HBaseConfiguration.create(job),
380 Bytes.toBytes("exampleTable"));
381
382 setHTable(exampleTable);
383 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
384 Bytes.toBytes("columnB") };
385
386 setInputColumns(inputColumns);
387 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
388
389 setRowFilter(exampleFilter);
390 } catch (IOException exception) {
391 throw new RuntimeException("Failed to configure for job.", exception);
392 }
393 }
394
395 }
396
397 }
398