View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.filter.Filter;
31  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32  import org.apache.hadoop.hbase.regionserver.HRegion;
33  import org.apache.hadoop.mapred.InputFormat;
34  import org.apache.hadoop.mapred.InputSplit;
35  import org.apache.hadoop.mapred.JobConf;
36  import org.apache.hadoop.mapred.RecordReader;
37  import org.apache.hadoop.mapred.Reporter;
38  
39  /**
40   * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
41   * byte[] of input columns and optionally a {@link Filter}.
42   * Subclasses may use other TableRecordReader implementations.
43   * <p>
44   * An example of a subclass:
45   * <pre>
46   *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
47   *
48   *     @Override
49   *     public void configure(JobConf job) {
50   *       try {
51   *         HTable exampleTable = new HTable(HBaseConfiguration.create(job),
52   *           Bytes.toBytes("exampleTable"));
53   *         // mandatory
54   *         setHTable(exampleTable);
55   *         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
56   *           Bytes.toBytes("columnB") };
57   *         // mandatory
58   *         setInputColumns(inputColumns);
59   *         // optional
60   *         Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
61   *         setRowFilter(exampleFilter);
62   *       } catch (IOException exception) {
63   *         throw new RuntimeException("Failed to configure for job.", exception);
64   *       }
65   *     }
66   *   }
67   * </pre>
68   */
69  
70  @Deprecated
71  @InterfaceAudience.Public
72  @InterfaceStability.Stable
73  public abstract class TableInputFormatBase
74  implements InputFormat<ImmutableBytesWritable, Result> {
75    final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
76    private byte [][] inputColumns;
77    private HTable table;
78    private TableRecordReader tableRecordReader;
79    private Filter rowFilter;
80  
81    /**
82     * Builds a TableRecordReader. If no TableRecordReader was provided, uses
83     * the default.
84     *
85     * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
86     *      JobConf, Reporter)
87     */
88    public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
89        InputSplit split, JobConf job, Reporter reporter)
90    throws IOException {
91      TableSplit tSplit = (TableSplit) split;
92      TableRecordReader trr = this.tableRecordReader;
93      // if no table record reader was provided use default
94      if (trr == null) {
95        trr = new TableRecordReader();
96      }
97      trr.setStartRow(tSplit.getStartRow());
98      trr.setEndRow(tSplit.getEndRow());
99      trr.setHTable(this.table);
100     trr.setInputColumns(this.inputColumns);
101     trr.setRowFilter(this.rowFilter);
102     trr.init();
103     return trr;
104   }
105 
106   /**
107    * Calculates the splits that will serve as input for the map tasks.
108    * <ul>
109    * Splits are created in number equal to the smallest between numSplits and
110    * the number of {@link HRegion}s in the table. If the number of splits is
111    * smaller than the number of {@link HRegion}s then splits are spanned across
112    * multiple {@link HRegion}s and are grouped the most evenly possible. In the
113    * case splits are uneven the bigger splits are placed first in the
114    * {@link InputSplit} array.
115    *
116    * @param job the map task {@link JobConf}
117    * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
118    *
119    * @return the input splits
120    *
121    * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
122    */
123   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
124     if (this.table == null) {
125       throw new IOException("No table was provided");
126     }
127     byte [][] startKeys = this.table.getStartKeys();
128     if (startKeys == null || startKeys.length == 0) {
129       throw new IOException("Expecting at least one region");
130     }
131     if (this.inputColumns == null || this.inputColumns.length == 0) {
132       throw new IOException("Expecting at least one column");
133     }
134     int realNumSplits = numSplits > startKeys.length? startKeys.length:
135       numSplits;
136     InputSplit[] splits = new InputSplit[realNumSplits];
137     int middle = startKeys.length / realNumSplits;
138     int startPos = 0;
139     for (int i = 0; i < realNumSplits; i++) {
140       int lastPos = startPos + middle;
141       lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
142       String regionLocation = table.getRegionLocation(startKeys[startPos]).
143         getHostname();
144       splits[i] = new TableSplit(this.table.getName(),
145         startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
146           HConstants.EMPTY_START_ROW, regionLocation);
147       LOG.info("split: " + i + "->" + splits[i]);
148       startPos = lastPos;
149     }
150     return splits;
151   }
152 
153   /**
154    * @param inputColumns to be passed in {@link Result} to the map task.
155    */
156   protected void setInputColumns(byte [][] inputColumns) {
157     this.inputColumns = inputColumns;
158   }
159 
160   /**
161    * Allows subclasses to get the {@link HTable}.
162    */
163   protected HTable getHTable() {
164     return this.table;
165   }
166 
167   /**
168    * Allows subclasses to set the {@link HTable}.
169    *
170    * @param table to get the data from
171    */
172   protected void setHTable(HTable table) {
173     this.table = table;
174   }
175 
176   /**
177    * Allows subclasses to set the {@link TableRecordReader}.
178    *
179    * @param tableRecordReader
180    *                to provide other {@link TableRecordReader} implementations.
181    */
182   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
183     this.tableRecordReader = tableRecordReader;
184   }
185 
186   /**
187    * Allows subclasses to set the {@link Filter} to be used.
188    *
189    * @param rowFilter
190    */
191   protected void setRowFilter(Filter rowFilter) {
192     this.rowFilter = rowFilter;
193   }
194 }