View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.mockito.Matchers.anyObject;
25  import static org.mockito.Mockito.doAnswer;
26  import static org.mockito.Mockito.doReturn;
27  import static org.mockito.Mockito.doThrow;
28  import static org.mockito.Mockito.mock;
29  import static org.mockito.Mockito.spy;
30  
31  import java.io.IOException;
32  import java.util.Arrays;
33  import java.util.Map;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.*;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
44  import org.apache.hadoop.hbase.filter.Filter;
45  import org.apache.hadoop.hbase.filter.RegexStringComparator;
46  import org.apache.hadoop.hbase.filter.RowFilter;
47  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
48  import org.apache.hadoop.hbase.testclassification.LargeTests;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.io.NullWritable;
51  import org.apache.hadoop.mapred.JobConf;
52  import org.apache.hadoop.mapred.JobConfigurable;
53  import org.apache.hadoop.mapred.MiniMRCluster;
54  import org.apache.hadoop.mapreduce.Job;
55  import org.apache.hadoop.mapreduce.Mapper.Context;
56  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
57  import org.junit.AfterClass;
58  import org.junit.Before;
59  import org.junit.BeforeClass;
60  import org.junit.Test;
61  import org.junit.experimental.categories.Category;
62  import org.mockito.invocation.InvocationOnMock;
63  import org.mockito.stubbing.Answer;
64  
65  /**
66   * This tests the TableInputFormat and its recovery semantics
67   *
68   */
69  @Category(LargeTests.class)
70  public class TestTableInputFormat {
71  
72    private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
73  
74    private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
75    private static MiniMRCluster mrCluster;
76    static final byte[] FAMILY = Bytes.toBytes("family");
77  
78    private static final byte[][] columns = new byte[][] { FAMILY };
79  
80    @BeforeClass
81    public static void beforeClass() throws Exception {
82      UTIL.startMiniCluster();
83      mrCluster = UTIL.startMiniMapReduceCluster();
84    }
85  
86    @AfterClass
87    public static void afterClass() throws Exception {
88      UTIL.shutdownMiniMapReduceCluster();
89      UTIL.shutdownMiniCluster();
90    }
91  
92    @Before
93    public void before() throws IOException {
94      LOG.info("before");
95      UTIL.ensureSomeRegionServersAvailable(1);
96      LOG.info("before done");
97    }
98  
99    /**
100    * Setup a table with two rows and values.
101    *
102    * @param tableName
103    * @return
104    * @throws IOException
105    */
106   public static HTable createTable(byte[] tableName) throws IOException {
107     return createTable(tableName, new byte[][] { FAMILY });
108   }
109 
110   /**
111    * Setup a table with two rows and values per column family.
112    *
113    * @param tableName
114    * @return
115    * @throws IOException
116    */
117   public static HTable createTable(byte[] tableName, byte[][] families) throws IOException {
118     HTable table = UTIL.createTable(tableName, families);
119     Put p = new Put("aaa".getBytes());
120     for (byte[] family : families) {
121       p.add(family, null, "value aaa".getBytes());
122     }
123     table.put(p);
124     p = new Put("bbb".getBytes());
125     for (byte[] family : families) {
126       p.add(family, null, "value bbb".getBytes());
127     }
128     table.put(p);
129     return table;
130   }
131 
132   /**
133    * Verify that the result and key have expected values.
134    *
135    * @param r
136    * @param key
137    * @param expectedKey
138    * @param expectedValue
139    * @return
140    */
141   static boolean checkResult(Result r, ImmutableBytesWritable key,
142       byte[] expectedKey, byte[] expectedValue) {
143     assertEquals(0, key.compareTo(expectedKey));
144     Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
145     byte[] value = vals.values().iterator().next();
146     assertTrue(Arrays.equals(value, expectedValue));
147     return true; // if succeed
148   }
149 
150   /**
151    * Create table data and run tests on specified htable using the
152    * o.a.h.hbase.mapreduce API.
153    *
154    * @param table
155    * @throws IOException
156    * @throws InterruptedException
157    */
158   static void runTestMapreduce(HTable table) throws IOException,
159       InterruptedException {
160     org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
161         new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
162     Scan s = new Scan();
163     s.setStartRow("aaa".getBytes());
164     s.setStopRow("zzz".getBytes());
165     s.addFamily(FAMILY);
166     trr.setScan(s);
167     trr.setHTable(table);
168 
169     trr.initialize(null, null);
170     Result r = new Result();
171     ImmutableBytesWritable key = new ImmutableBytesWritable();
172 
173     boolean more = trr.nextKeyValue();
174     assertTrue(more);
175     key = trr.getCurrentKey();
176     r = trr.getCurrentValue();
177     checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
178 
179     more = trr.nextKeyValue();
180     assertTrue(more);
181     key = trr.getCurrentKey();
182     r = trr.getCurrentValue();
183     checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
184 
185     // no more data
186     more = trr.nextKeyValue();
187     assertFalse(more);
188   }
189 
190   /**
191    * Create a table that IOE's on first scanner next call
192    *
193    * @throws IOException
194    */
195   static HTable createIOEScannerTable(byte[] name, final int failCnt)
196       throws IOException {
197     // build up a mock scanner stuff to fail the first time
198     Answer<ResultScanner> a = new Answer<ResultScanner>() {
199       int cnt = 0;
200 
201       @Override
202       public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
203         // first invocation return the busted mock scanner
204         if (cnt++ < failCnt) {
205           // create mock ResultScanner that always fails.
206           Scan scan = mock(Scan.class);
207           doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
208           ResultScanner scanner = mock(ResultScanner.class);
209           // simulate TimeoutException / IOException
210           doThrow(new IOException("Injected exception")).when(scanner).next();
211           return scanner;
212         }
213 
214         // otherwise return the real scanner.
215         return (ResultScanner) invocation.callRealMethod();
216       }
217     };
218 
219     HTable htable = spy(createTable(name));
220     doAnswer(a).when(htable).getScanner((Scan) anyObject());
221     return htable;
222   }
223 
224   /**
225    * Create a table that throws a DoNoRetryIOException on first scanner next
226    * call
227    *
228    * @throws IOException
229    */
230   static HTable createDNRIOEScannerTable(byte[] name, final int failCnt)
231       throws IOException {
232     // build up a mock scanner stuff to fail the first time
233     Answer<ResultScanner> a = new Answer<ResultScanner>() {
234       int cnt = 0;
235 
236       @Override
237       public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
238         // first invocation return the busted mock scanner
239         if (cnt++ < failCnt) {
240           // create mock ResultScanner that always fails.
241           Scan scan = mock(Scan.class);
242           doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
243           ResultScanner scanner = mock(ResultScanner.class);
244 
245           invocation.callRealMethod(); // simulate UnknownScannerException
246           doThrow(
247               new UnknownScannerException("Injected simulated TimeoutException"))
248               .when(scanner).next();
249           return scanner;
250         }
251 
252         // otherwise return the real scanner.
253         return (ResultScanner) invocation.callRealMethod();
254       }
255     };
256 
257     HTable htable = spy(createTable(name));
258     doAnswer(a).when(htable).getScanner((Scan) anyObject());
259     return htable;
260   }
261 
262   /**
263    * Run test assuming no errors using newer mapreduce api
264    *
265    * @throws IOException
266    * @throws InterruptedException
267    */
268   @Test
269   public void testTableRecordReaderMapreduce() throws IOException,
270       InterruptedException {
271     HTable table = createTable("table1-mr".getBytes());
272     runTestMapreduce(table);
273   }
274 
275   /**
276    * Run test assuming Scanner IOException failure using newer mapreduce api
277    *
278    * @throws IOException
279    * @throws InterruptedException
280    */
281   @Test
282   public void testTableRecordReaderScannerFailMapreduce() throws IOException,
283       InterruptedException {
284     HTable htable = createIOEScannerTable("table2-mr".getBytes(), 1);
285     runTestMapreduce(htable);
286   }
287 
288   /**
289    * Run test assuming Scanner IOException failure using newer mapreduce api
290    *
291    * @throws IOException
292    * @throws InterruptedException
293    */
294   @Test(expected = IOException.class)
295   public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException,
296       InterruptedException {
297     HTable htable = createIOEScannerTable("table3-mr".getBytes(), 2);
298     runTestMapreduce(htable);
299   }
300 
301   /**
302    * Run test assuming UnknownScannerException (which is a type of
303    * DoNotRetryIOException) using newer mapreduce api
304    *
305    * @throws InterruptedException
306    * @throws org.apache.hadoop.hbase.DoNotRetryIOException
307    */
308   @Test
309   public void testTableRecordReaderScannerTimeoutMapreduce()
310       throws IOException, InterruptedException {
311     HTable htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
312     runTestMapreduce(htable);
313   }
314 
315   /**
316    * Run test assuming UnknownScannerException (which is a type of
317    * DoNotRetryIOException) using newer mapreduce api
318    *
319    * @throws InterruptedException
320    * @throws org.apache.hadoop.hbase.DoNotRetryIOException
321    */
322   @Test(expected = org.apache.hadoop.hbase.DoNotRetryIOException.class)
323   public void testTableRecordReaderScannerTimeoutMapreduceTwice()
324       throws IOException, InterruptedException {
325     HTable htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
326     runTestMapreduce(htable);
327   }
328 
329   /**
330    * Verify the example we present in javadocs on TableInputFormatBase
331    */
332   @Test
333   public void testExtensionOfTableInputFormatBase()
334       throws IOException, InterruptedException, ClassNotFoundException {
335     LOG.info("testing use of an InputFormat taht extends InputFormatBase");
336     final HTable htable = createTable(Bytes.toBytes("exampleTable"),
337       new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
338 
339     final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
340     job.setInputFormatClass(ExampleTIF.class);
341     job.setOutputFormatClass(NullOutputFormat.class);
342     job.setMapperClass(ExampleVerifier.class);
343     job.setNumReduceTasks(0);
344 
345     LOG.debug("submitting job.");
346     assertTrue("job failed!", job.waitForCompletion(true));
347     assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
348         .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
349     assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
350         .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
351     assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
352         .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
353     assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
354         .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
355     assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
356         .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
357     assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
358         .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
359   }
360 
361   public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
362 
363     @Override
364     public void map(ImmutableBytesWritable key, Result value, Context context)
365         throws IOException {
366       for (Cell cell : value.listCells()) {
367         context.getCounter(TestTableInputFormat.class.getName() + ":row",
368             Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
369             .increment(1l);
370         context.getCounter(TestTableInputFormat.class.getName() + ":family",
371             Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
372             .increment(1l);
373         context.getCounter(TestTableInputFormat.class.getName() + ":value",
374             Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
375             .increment(1l);
376       }
377     }
378 
379   }
380 
381   public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
382 
383     @Override
384     public void configure(JobConf job) {
385       try {
386         HTable exampleTable = new HTable(HBaseConfiguration.create(job),
387           Bytes.toBytes("exampleTable"));
388         // mandatory
389         setHTable(exampleTable);
390         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
391           Bytes.toBytes("columnB") };
392         // optional
393         Scan scan = new Scan();
394         for (byte[] family : inputColumns) {
395           scan.addFamily(family);
396         }
397         Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
398         scan.setFilter(exampleFilter);
399         setScan(scan);
400       } catch (IOException exception) {
401         throw new RuntimeException("Failed to configure for job.", exception);
402       }
403     }
404 
405   }
406 
407 }
408