View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configurable;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.TableName;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.hbase.util.Pair;
35  import org.apache.hadoop.mapreduce.Job;
36  import org.apache.hadoop.util.StringUtils;
37  
38  /**
39   * Convert HBase tabular data into a format that is consumable by Map/Reduce.
40   */
41  @InterfaceAudience.Public
42  @InterfaceStability.Stable
43  public class TableInputFormat extends TableInputFormatBase
44  implements Configurable {
45  
46    private final Log LOG = LogFactory.getLog(TableInputFormat.class);
47  
48    /** Job parameter that specifies the input table. */
49    public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
50    /**
51     * If specified, use start keys of this table to split.
52     * This is useful when you are preparing data for bulkload.
53     */
54    private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
55    /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
56     * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
57     */
58    public static final String SCAN = "hbase.mapreduce.scan";
59    /** Scan start row */
60    public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
61    /** Scan stop row */
62    public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
63    /** Column Family to Scan */
64    public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
65    /** Space delimited list of columns and column families to scan. */
66    public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
67    /** The timestamp used to filter columns with a specific timestamp. */
68    public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
69    /** The starting timestamp used to filter columns with a specific range of versions. */
70    public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
71    /** The ending timestamp used to filter columns with a specific range of versions. */
72    public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
73    /** The maximum number of version to return. */
74    public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
75    /** Set to false to disable server-side caching of blocks for this scan. */
76    public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
77    /** The number of rows for caching that will be passed to scanners. */
78    public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
79    /** Set the maximum number of values to return for each call to next(). */
80    public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
81  
82    /** The configuration. */
83    private Configuration conf = null;
84  
85    /**
86     * Returns the current configuration.
87     *
88     * @return The current configuration.
89     * @see org.apache.hadoop.conf.Configurable#getConf()
90     */
91    @Override
92    public Configuration getConf() {
93      return conf;
94    }
95  
96    /**
97     * Sets the configuration. This is used to set the details for the table to
98     * be scanned.
99     *
100    * @param configuration  The configuration to set.
101    * @see org.apache.hadoop.conf.Configurable#setConf(
102    *   org.apache.hadoop.conf.Configuration)
103    */
104   @Override
105   public void setConf(Configuration configuration) {
106     this.conf = configuration;
107     String tableName = conf.get(INPUT_TABLE);
108     try {
109       setHTable(new HTable(new Configuration(conf), tableName));
110     } catch (Exception e) {
111       LOG.error(StringUtils.stringifyException(e));
112     }
113     
114     Scan scan = null;
115 
116     if (conf.get(SCAN) != null) {
117       try {
118         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
119       } catch (IOException e) {
120         LOG.error("An error occurred.", e);
121       }
122     } else {
123       try {
124         scan = new Scan();
125 
126         if (conf.get(SCAN_ROW_START) != null) {
127           scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
128         }
129 
130         if (conf.get(SCAN_ROW_STOP) != null) {
131           scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
132         }
133 
134         if (conf.get(SCAN_COLUMNS) != null) {
135           addColumns(scan, conf.get(SCAN_COLUMNS));
136         }
137 
138         if (conf.get(SCAN_COLUMN_FAMILY) != null) {
139           scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
140         }
141 
142         if (conf.get(SCAN_TIMESTAMP) != null) {
143           scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
144         }
145 
146         if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
147           scan.setTimeRange(
148               Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
149               Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
150         }
151 
152         if (conf.get(SCAN_MAXVERSIONS) != null) {
153           scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
154         }
155 
156         if (conf.get(SCAN_CACHEDROWS) != null) {
157           scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
158         }
159 
160         if (conf.get(SCAN_BATCHSIZE) != null) {
161           scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
162         }
163 
164         // false by default, full table scans generate too much BC churn
165         scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
166       } catch (Exception e) {
167           LOG.error(StringUtils.stringifyException(e));
168       }
169     }
170 
171     setScan(scan);
172   }
173   
174   /**
175    * Parses a combined family and qualifier and adds either both or just the
176    * family in case there is no qualifier. This assumes the older colon
177    * divided notation, e.g. "family:qualifier".
178    *
179    * @param scan The Scan to update.
180    * @param familyAndQualifier family and qualifier
181    * @return A reference to this instance.
182    * @throws IllegalArgumentException When familyAndQualifier is invalid.
183    */
184   private static void addColumn(Scan scan, byte[] familyAndQualifier) {
185     byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
186     if (fq.length == 1) {
187       scan.addFamily(fq[0]);
188     } else if (fq.length == 2) {
189       scan.addColumn(fq[0], fq[1]);
190     } else {
191       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
192     }
193   }
194 
195   /**
196    * Adds an array of columns specified using old format, family:qualifier.
197    * <p>
198    * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
199    * input.
200    *
201    * @param scan The Scan to update.
202    * @param columns array of columns, formatted as <code>family:qualifier</code>
203    * @see Scan#addColumn(byte[], byte[])
204    */
205   public static void addColumns(Scan scan, byte [][] columns) {
206     for (byte[] column : columns) {
207       addColumn(scan, column);
208     }
209   }
210 
211   /**
212    * Convenience method to parse a string representation of an array of column specifiers.
213    *
214    * @param scan The Scan to update.
215    * @param columns  The columns to parse.
216    */
217   private static void addColumns(Scan scan, String columns) {
218     String[] cols = columns.split(" ");
219     for (String col : cols) {
220       addColumn(scan, Bytes.toBytes(col));
221     }
222   }
223 
224   @Override
225   protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
226     if (conf.get(SPLIT_TABLE) != null) {
227       TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
228       HTable rl = new HTable(getConf(), splitTableName);
229       try {
230         return rl.getStartEndKeys();
231       } finally {
232         rl.close();
233       }
234     }
235 
236     return super.getStartEndKeys();
237   }
238   
239   /**
240    * Sets split table in map-reduce job.
241    */
242   public static void configureSplitTable(Job job, TableName tableName) {
243     job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
244   }
245 }