View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.util.ArrayList;
26  import java.util.HashMap;
27  import java.util.List;
28  
29  import javax.naming.NamingException;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.classification.InterfaceStability;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.HRegionLocation;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.client.Result;
40  import org.apache.hadoop.hbase.client.Scan;
41  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42  import org.apache.hadoop.hbase.util.Addressing;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.Pair;
45  import org.apache.hadoop.hbase.util.RegionSizeCalculator;
46  import org.apache.hadoop.hbase.util.Strings;
47  import org.apache.hadoop.mapreduce.InputFormat;
48  import org.apache.hadoop.mapreduce.InputSplit;
49  import org.apache.hadoop.mapreduce.JobContext;
50  import org.apache.hadoop.mapreduce.RecordReader;
51  import org.apache.hadoop.mapreduce.TaskAttemptContext;
52  import org.apache.hadoop.net.DNS;
53  import org.apache.hadoop.util.StringUtils;
54  
55  /**
56   * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
57   * {@link Scan} instance that defines the input columns etc. Subclasses may use
58   * other TableRecordReader implementations.
59   * <p>
60   * An example of a subclass:
61   * <pre>
62   *   public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
63   *  
64   *     @Override
65   *     public void configure(JobConf job) {
66   *       try {
67   *         HTable exampleTable = new HTable(HBaseConfiguration.create(job),
68   *           Bytes.toBytes("exampleTable"));
69   *         // mandatory
70   *         setHTable(exampleTable);
71   *         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
72   *           Bytes.toBytes("columnB") };
73   *         // optional
74   *         Scan scan = new Scan();
75   *         for (byte[] family : inputColumns) {
76   *           scan.addFamily(family);
77   *         }
78   *         Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
79   *         scan.setFilter(exampleFilter);
80   *         setScan(scan);
81   *       } catch (IOException exception) {
82   *         throw new RuntimeException("Failed to configure for job.", exception);
83   *       }
84   *     }
85   *   }
86   * </pre>
87   */
88  @InterfaceAudience.Public
89  @InterfaceStability.Stable
90  public abstract class TableInputFormatBase
91  extends InputFormat<ImmutableBytesWritable, Result> {
92  
93    /** Specify if we enable auto-balance for input in M/R jobs.*/
94    public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
95    /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
96     * .input.autobalance property.*/
97    public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" +
98            ".maxskewratio";
99    /** Specify if the row key in table is text (ASCII between 32~126),
100    * default is true. False means the table is using binary row key*/
101   public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
102 
103   final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
104 
105   /** Holds the details for the internal scanner. */
106   private Scan scan = null;
107   /** The table to scan. */
108   private HTable table = null;
109   /** The reader scanning the table, can be a custom one. */
110   private TableRecordReader tableRecordReader = null;
111   
112   /** The reverse DNS lookup cache mapping: IPAddress => HostName */
113   private HashMap<InetAddress, String> reverseDNSCacheMap =
114     new HashMap<InetAddress, String>();
115   
116   /** The NameServer address */
117   private String nameServer = null;
118   
119   /**
120    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
121    * the default.
122    *
123    * @param split  The split to work with.
124    * @param context  The current context.
125    * @return The newly created record reader.
126    * @throws IOException When creating the reader fails.
127    * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
128    *   org.apache.hadoop.mapreduce.InputSplit,
129    *   org.apache.hadoop.mapreduce.TaskAttemptContext)
130    */
131   @Override
132   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
133       InputSplit split, TaskAttemptContext context)
134   throws IOException {
135     if (table == null) {
136       throw new IOException("Cannot create a record reader because of a" +
137           " previous error. Please look at the previous logs lines from" +
138           " the task's full log for more details.");
139     }
140     TableSplit tSplit = (TableSplit) split;
141     LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
142     TableRecordReader trr = this.tableRecordReader;
143     // if no table record reader was provided use default
144     if (trr == null) {
145       trr = new TableRecordReader();
146     }
147     Scan sc = new Scan(this.scan);
148     sc.setStartRow(tSplit.getStartRow());
149     sc.setStopRow(tSplit.getEndRow());
150     trr.setScan(sc);
151     trr.setHTable(table);
152     return trr;
153   }
154   
155   protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
156     return table.getStartEndKeys();
157   }
158 
159   /**
160    * Calculates the splits that will serve as input for the map tasks. The
161    * number of splits matches the number of regions in a table.
162    *
163    * @param context  The current job context.
164    * @return The list of input splits.
165    * @throws IOException When creating the list of splits fails.
166    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
167    *   org.apache.hadoop.mapreduce.JobContext)
168    */
169   @Override
170   public List<InputSplit> getSplits(JobContext context) throws IOException {
171     if (table == null) {
172       throw new IOException("No table was provided.");
173     }
174     // Get the name server address and the default value is null.
175     this.nameServer =
176       context.getConfiguration().get("hbase.nameserver.address", null);
177 
178     RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table);
179 
180     
181     Pair<byte[][], byte[][]> keys = getStartEndKeys();
182     if (keys == null || keys.getFirst() == null ||
183         keys.getFirst().length == 0) {
184       HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
185       if (null == regLoc) {
186         throw new IOException("Expecting at least one region.");
187       }
188       List<InputSplit> splits = new ArrayList<InputSplit>(1);
189       long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
190       TableSplit split = new TableSplit(table.getName(),
191           HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
192               .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
193       splits.add(split);
194       return splits;
195     }
196     List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
197     for (int i = 0; i < keys.getFirst().length; i++) {
198       if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
199         continue;
200       }
201       HRegionLocation location = table.getRegionLocation(keys.getFirst()[i], false);
202       // The below InetSocketAddress creation does a name resolution.
203       InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
204       if (isa.isUnresolved()) {
205         LOG.warn("Failed resolve " + isa);
206       }
207       InetAddress regionAddress = isa.getAddress();
208       String regionLocation;
209       try {
210         regionLocation = reverseDNS(regionAddress);
211       } catch (NamingException e) {
212         LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e);
213         regionLocation = location.getHostname();
214       }
215 
216       byte[] startRow = scan.getStartRow();
217       byte[] stopRow = scan.getStopRow();
218       // determine if the given start an stop key fall into the region
219       if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
220           Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
221           (stopRow.length == 0 ||
222            Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
223         byte[] splitStart = startRow.length == 0 ||
224           Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
225             keys.getFirst()[i] : startRow;
226         byte[] splitStop = (stopRow.length == 0 ||
227           Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
228           keys.getSecond()[i].length > 0 ?
229             keys.getSecond()[i] : stopRow;
230 
231         byte[] regionName = location.getRegionInfo().getRegionName();
232         long regionSize = sizeCalculator.getRegionSize(regionName);
233         TableSplit split = new TableSplit(table.getName(),
234           splitStart, splitStop, regionLocation, regionSize);
235         splits.add(split);
236         if (LOG.isDebugEnabled()) {
237           LOG.debug("getSplits: split -> " + i + " -> " + split);
238         }
239       }
240     }
241     //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
242     boolean enableAutoBalance = context.getConfiguration().getBoolean(
243       MAPREDUCE_INPUT_AUTOBALANCE, false);
244     if (enableAutoBalance) {
245       long totalRegionSize=0;
246       for (int i = 0; i < splits.size(); i++){
247         TableSplit ts = (TableSplit)splits.get(i);
248         totalRegionSize += ts.getLength();
249       }
250       long averageRegionSize = totalRegionSize / splits.size();
251       // the averageRegionSize must be positive.
252       if (averageRegionSize <= 0) {
253         LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
254             "set it to 1.");
255         averageRegionSize = 1;
256       }
257       return calculateRebalancedSplits(splits, context, averageRegionSize);
258     } else {
259       return splits;
260     }
261   }
262   
263   public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException {
264     String hostName = this.reverseDNSCacheMap.get(ipAddress);
265     if (hostName == null) {
266       String ipAddressString = null;
267       try {
268         ipAddressString = DNS.reverseDns(ipAddress, null);
269       } catch (Exception e) {
270         // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the
271         // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
272         // reverse DNS using jndi doesn't work well with ipv6 addresses.
273         ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
274       }
275       if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
276       hostName = Strings.domainNamePointerToHostName(ipAddressString);
277       this.reverseDNSCacheMap.put(ipAddress, hostName);
278     }
279     return hostName;
280   }
281 
282   /**
283    * Calculates the number of MapReduce input splits for the map tasks. The number of
284    * MapReduce input splits depends on the average region size and the "data skew ratio" user set in
285    * configuration.
286    *
287    * @param list  The list of input splits before balance.
288    * @param context  The current job context.
289    * @param average  The average size of all regions .
290    * @return The list of input splits.
291    * @throws IOException When creating the list of splits fails.
292    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
293    *   org.apache.hadoop.mapreduce.JobContext)
294    */
295   public List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
296                                                long average) throws IOException {
297     List<InputSplit> resultList = new ArrayList<InputSplit>();
298     Configuration conf = context.getConfiguration();
299     //The default data skew ratio is 3
300     long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
301     //It determines which mode to use: text key mode or binary key mode. The default is text mode.
302     boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
303     long dataSkewThreshold = dataSkewRatio * average;
304     int count = 0;
305     while (count < list.size()) {
306       TableSplit ts = (TableSplit)list.get(count);
307       String regionLocation = ts.getRegionLocation();
308       long regionSize = ts.getLength();
309       if (regionSize >= dataSkewThreshold) {
310         // if the current region size is large than the data skew threshold,
311         // split the region into two MapReduce input splits.
312         byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
313          //Set the size of child TableSplit as 1/2 of the region size. The exact size of the
314          // MapReduce input splits is not far off.
315         TableSplit t1 = new TableSplit(table.getName(), ts.getStartRow(), splitKey, regionLocation,
316                 regionSize / 2);
317         TableSplit t2 = new TableSplit(table.getName(), splitKey, ts.getEndRow(), regionLocation,
318                 regionSize - regionSize / 2);
319         resultList.add(t1);
320         resultList.add(t2);
321         count++;
322       } else if (regionSize >= average) {
323         // if the region size between average size and data skew threshold size,
324         // make this region as one MapReduce input split.
325         resultList.add(ts);
326         count++;
327       } else {
328         // if the total size of several small continuous regions less than the average region size,
329         // combine them into one MapReduce input split.
330         long totalSize = regionSize;
331         byte[] splitStartKey = ts.getStartRow();
332         byte[] splitEndKey = ts.getEndRow();
333         count++;
334         for (; count < list.size(); count++) {
335           TableSplit nextRegion = (TableSplit)list.get(count);
336           long nextRegionSize = nextRegion.getLength();
337           if (totalSize + nextRegionSize <= dataSkewThreshold) {
338             totalSize = totalSize + nextRegionSize;
339             splitEndKey = nextRegion.getEndRow();
340           } else {
341             break;
342           }
343         }
344         TableSplit t = new TableSplit(table.getName(), splitStartKey, splitEndKey,
345                 regionLocation, totalSize);
346         resultList.add(t);
347       }
348     }
349     return resultList;
350   }
351 
352   /**
353    * select a split point in the region. The selection of the split point is based on an uniform
354    * distribution assumption for the keys in a region.
355    * Here are some examples:
356    * startKey: aaabcdefg  endKey: aaafff    split point: aaad
357    * startKey: 111000  endKey: 1125790    split point: 111b
358    * startKey: 1110  endKey: 1120    split point: 111_
359    * startKey: binary key { 13, -19, 126, 127 }, endKey: binary key { 13, -19, 127, 0 },
360    * split point: binary key { 13, -19, 127, -64 }
361    * Set this function as "public static", make it easier for test.
362    *
363    * @param start Start key of the region
364    * @param end End key of the region
365    * @param isText It determines to use text key mode or binary key mode
366    * @return The split point in the region.
367    */
368   public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
369     byte upperLimitByte;
370     byte lowerLimitByte;
371     //Use text mode or binary mode.
372     if (isText) {
373       //The range of text char set in ASCII is [32,126], the lower limit is space and the upper
374       // limit is '~'.
375       upperLimitByte = '~';
376       lowerLimitByte = ' ';
377     } else {
378       upperLimitByte = Byte.MAX_VALUE;
379       lowerLimitByte = Byte.MIN_VALUE;
380     }
381     // For special case
382     // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
383     // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
384     if (start.length == 0 && end.length == 0){
385       return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
386     }
387     if (start.length == 0 && end.length != 0){
388       return new byte[]{ end[0] };
389     }
390     if (start.length != 0 && end.length == 0){
391       byte[] result =new byte[start.length];
392       result[0]=start[0];
393       for (int k = 1; k < start.length; k++){
394           result[k] = upperLimitByte;
395       }
396       return result;
397     }
398     // A list to store bytes in split key
399     List<Byte> resultBytesList = new ArrayList<Byte>();
400     int maxLength = start.length > end.length ? start.length : end.length;
401     for (int i = 0; i < maxLength; i++) {
402       //calculate the midpoint byte between the first difference
403       //for example: "11ae" and "11chw", the midpoint is "11b"
404       //another example: "11ae" and "11bhw", the first different byte is 'a' and 'b',
405       // there is no midpoint between 'a' and 'b', so we need to check the next byte.
406       if (start[i] == end[i]) {
407         resultBytesList.add(start[i]);
408         //For special case like: startKey="aaa", endKey="aaaz", splitKey="aaaM"
409         if (i + 1 == start.length) {
410           resultBytesList.add((byte) ((lowerLimitByte + end[i + 1]) / 2));
411           break;
412         }
413       } else {
414         //if the two bytes differ by 1, like ['a','b'], We need to check the next byte to find
415         // the midpoint.
416         if ((int)end[i] - (int)start[i] == 1) {
417           //get next byte after the first difference
418           byte startNextByte = (i + 1 < start.length) ? start[i + 1] : lowerLimitByte;
419           byte endNextByte = (i + 1 < end.length) ? end[i + 1] : lowerLimitByte;
420           int byteRange = (upperLimitByte - startNextByte) + (endNextByte - lowerLimitByte) + 1;
421           int halfRange = byteRange / 2;
422           if ((int)startNextByte + halfRange > (int)upperLimitByte) {
423             resultBytesList.add(end[i]);
424             resultBytesList.add((byte) (startNextByte + halfRange - upperLimitByte +
425                     lowerLimitByte));
426           } else {
427             resultBytesList.add(start[i]);
428             resultBytesList.add((byte) (startNextByte + halfRange));
429           }
430         } else {
431           //calculate the midpoint key by the fist different byte (normal case),
432           // like "11ae" and "11chw", the midpoint is "11b"
433           resultBytesList.add((byte) ((start[i] + end[i]) / 2));
434         }
435         break;
436       }
437     }
438     //transform the List of bytes to byte[]
439     byte result[] = new byte[resultBytesList.size()];
440     for (int k = 0; k < resultBytesList.size(); k++) {
441       result[k] = (byte) resultBytesList.get(k);
442     }
443     return result;
444   }
445 
446   /**
447    *
448    *
449    * Test if the given region is to be included in the InputSplit while splitting
450    * the regions of a table.
451    * <p>
452    * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
453    * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
454    * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
455    * continuously. In addition to reducing InputSplits, reduces the load on the region server as 
456    * well, due to the ordering of the keys.
457    * <br>
458    * <br>
459    * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
460    * <br>
461    * Override this method, if you want to bulk exclude regions altogether from M-R.
462    * By default, no region is excluded( i.e. all regions are included).
463    *
464    *
465    * @param startKey Start key of the region
466    * @param endKey End key of the region
467    * @return true, if this region needs to be included as part of the input (default).
468    *
469    */
470   protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
471     return true;
472   }
473 
474   /**
475    * Allows subclasses to get the {@link HTable}.
476    */
477   protected HTable getHTable() {
478     return this.table;
479   }
480 
481   /**
482    * Allows subclasses to set the {@link HTable}.
483    *
484    * @param table  The table to get the data from.
485    */
486   protected void setHTable(HTable table) {
487     this.table = table;
488   }
489 
490   /**
491    * Gets the scan defining the actual details like columns etc.
492    *
493    * @return The internal scan instance.
494    */
495   public Scan getScan() {
496     if (this.scan == null) this.scan = new Scan();
497     return scan;
498   }
499 
500   /**
501    * Sets the scan defining the actual details like columns etc.
502    *
503    * @param scan  The scan to set.
504    */
505   public void setScan(Scan scan) {
506     this.scan = scan;
507   }
508 
509   /**
510    * Allows subclasses to set the {@link TableRecordReader}.
511    *
512    * @param tableRecordReader A different {@link TableRecordReader}
513    *   implementation.
514    */
515   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
516     this.tableRecordReader = tableRecordReader;
517   }
518 }