View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.IOException;
25  import java.util.Arrays;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HRegionLocation;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Result;
38  import org.apache.hadoop.hbase.client.Scan;
39  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.io.NullWritable;
42  import org.apache.hadoop.mapreduce.InputSplit;
43  import org.apache.hadoop.mapreduce.Job;
44  import org.apache.hadoop.mapreduce.Reducer;
45  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
46  import org.junit.AfterClass;
47  import org.junit.Assert;
48  import org.junit.BeforeClass;
49  
50  
51  /**
52   * <p>
53   * Tests various scan start and stop row scenarios. This is set in a scan and
54   * tested in a MapReduce job to see if that is handed over and done properly
55   * too.
56   * </p>
57   * <p>
58   * This test is broken into two parts in order to side-step the test timeout
59   * period of 900, as documented in HBASE-8326.
60   * </p>
61   */
62  public abstract class TestTableInputFormatScanBase {
63  
64    static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
65    static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
66  
67    static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
68    static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
69    static final String KEY_STARTROW = "startRow";
70    static final String KEY_LASTROW = "stpRow";
71  
72    private static HTable table = null;
73  
74    @BeforeClass
75    public static void setUpBeforeClass() throws Exception {
76      // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on.
77      // this turns it off for this test.  TODO: Figure out why scr breaks recovery. 
78      System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
79  
80      // switch TIF to log at DEBUG level
81      TEST_UTIL.enableDebug(TableInputFormat.class);
82      TEST_UTIL.enableDebug(TableInputFormatBase.class);
83      // start mini hbase cluster
84      TEST_UTIL.startMiniCluster(3);
85      // create and fill table
86      table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
87      TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
88      TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
89      // start MR cluster
90      TEST_UTIL.startMiniMapReduceCluster();
91    }
92  
93    @AfterClass
94    public static void tearDownAfterClass() throws Exception {
95      TEST_UTIL.shutdownMiniMapReduceCluster();
96      TEST_UTIL.shutdownMiniCluster();
97    }
98  
99    /**
100    * Pass the key and value to reduce.
101    */
102   public static class ScanMapper
103   extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
104 
105     /**
106      * Pass the key and value to reduce.
107      *
108      * @param key  The key, here "aaa", "aab" etc.
109      * @param value  The value is the same as the key.
110      * @param context  The task context.
111      * @throws IOException When reading the rows fails.
112      */
113     @Override
114     public void map(ImmutableBytesWritable key, Result value,
115       Context context)
116     throws IOException, InterruptedException {
117       if (value.size() != 1) {
118         throw new IOException("There should only be one input column");
119       }
120       Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
121         cf = value.getMap();
122       if(!cf.containsKey(INPUT_FAMILY)) {
123         throw new IOException("Wrong input columns. Missing: '" +
124           Bytes.toString(INPUT_FAMILY) + "'.");
125       }
126       String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
127       LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
128         ", value -> " + val);
129       context.write(key, key);
130     }
131 
132   }
133 
134   /**
135    * Checks the last and first key seen against the scanner boundaries.
136    */
137   public static class ScanReducer
138   extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
139                   NullWritable, NullWritable> {
140 
141     private String first = null;
142     private String last = null;
143 
144     protected void reduce(ImmutableBytesWritable key,
145         Iterable<ImmutableBytesWritable> values, Context context)
146     throws IOException ,InterruptedException {
147       int count = 0;
148       for (ImmutableBytesWritable value : values) {
149         String val = Bytes.toStringBinary(value.get());
150         LOG.info("reduce: key[" + count + "] -> " +
151           Bytes.toStringBinary(key.get()) + ", value -> " + val);
152         if (first == null) first = val;
153         last = val;
154         count++;
155       }
156     }
157 
158     protected void cleanup(Context context)
159     throws IOException, InterruptedException {
160       Configuration c = context.getConfiguration();
161       String startRow = c.get(KEY_STARTROW);
162       String lastRow = c.get(KEY_LASTROW);
163       LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
164       LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
165       if (startRow != null && startRow.length() > 0) {
166         assertEquals(startRow, first);
167       }
168       if (lastRow != null && lastRow.length() > 0) {
169         assertEquals(lastRow, last);
170       }
171     }
172 
173   }
174 
175   /**
176    * Tests an MR Scan initialized from properties set in the Configuration.
177    * 
178    * @throws IOException
179    * @throws ClassNotFoundException
180    * @throws InterruptedException
181    */
182   protected void testScanFromConfiguration(String start, String stop, String last)
183   throws IOException, InterruptedException, ClassNotFoundException {
184     String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") +
185       "To" + (stop != null ? stop.toUpperCase() : "Empty");
186     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
187     c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME));
188     c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY));
189     c.set(KEY_STARTROW, start != null ? start : "");
190     c.set(KEY_LASTROW, last != null ? last : "");
191 
192     if (start != null) {
193       c.set(TableInputFormat.SCAN_ROW_START, start);
194     }
195 
196     if (stop != null) {
197       c.set(TableInputFormat.SCAN_ROW_STOP, stop);
198     }
199 
200     Job job = new Job(c, jobName);
201     job.setMapperClass(ScanMapper.class);
202     job.setReducerClass(ScanReducer.class);
203     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
204     job.setMapOutputValueClass(ImmutableBytesWritable.class);
205     job.setInputFormatClass(TableInputFormat.class);
206     job.setNumReduceTasks(1);
207     FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
208     TableMapReduceUtil.addDependencyJars(job);
209     assertTrue(job.waitForCompletion(true));
210   }
211 
212   /**
213    * Tests a MR scan using specific start and stop rows.
214    *
215    * @throws IOException
216    * @throws ClassNotFoundException
217    * @throws InterruptedException
218    */
219   protected void testScan(String start, String stop, String last)
220   throws IOException, InterruptedException, ClassNotFoundException {
221     String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
222       "To" + (stop != null ? stop.toUpperCase() : "Empty");
223     LOG.info("Before map/reduce startup - job " + jobName);
224     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
225     Scan scan = new Scan();
226     scan.addFamily(INPUT_FAMILY);
227     if (start != null) {
228       scan.setStartRow(Bytes.toBytes(start));
229     }
230     c.set(KEY_STARTROW, start != null ? start : "");
231     if (stop != null) {
232       scan.setStopRow(Bytes.toBytes(stop));
233     }
234     c.set(KEY_LASTROW, last != null ? last : "");
235     LOG.info("scan before: " + scan);
236     Job job = new Job(c, jobName);
237     TableMapReduceUtil.initTableMapperJob(
238       Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
239       ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
240     job.setReducerClass(ScanReducer.class);
241     job.setNumReduceTasks(1); // one to get final "first" and "last" key
242     FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
243     LOG.info("Started " + job.getJobName());
244     assertTrue(job.waitForCompletion(true));
245     LOG.info("After map/reduce completion - job " + jobName);
246   }
247 
248 
249   /**
250    * Tests a MR scan using data skew auto-balance
251    *
252    * @throws IOException
253    * @throws ClassNotFoundException
254    * @throws InterruptedException
255    */
256   public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
257           InterruptedException,
258           ClassNotFoundException {
259     String jobName = "TestJobForNumOfSplits";
260     LOG.info("Before map/reduce startup - job " + jobName);
261     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
262     Scan scan = new Scan();
263     scan.addFamily(INPUT_FAMILY);
264     c.set("hbase.mapreduce.input.autobalance", "true");
265     c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
266     c.set(KEY_STARTROW, "");
267     c.set(KEY_LASTROW, "");
268     Job job = new Job(c, jobName);
269     TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
270             ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
271     TableInputFormat tif = new TableInputFormat();
272     tif.setConf(job.getConfiguration());
273     Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
274     List<InputSplit> splits = tif.getSplits(job);
275     Assert.assertEquals(expectedNumOfSplits, splits.size());
276   }
277 
278   /**
279    * Tests for the getSplitKey() method in TableInputFormatBase.java
280    */
281   public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) {
282     byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
283       Assert.assertArrayEquals(splitKey, result);
284   }
285 }
286