1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configurable;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.KeyValue;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.hbase.util.Pair;
35 import org.apache.hadoop.mapreduce.Job;
36 import org.apache.hadoop.util.StringUtils;
37
38
39
40
41 @InterfaceAudience.Public
42 @InterfaceStability.Stable
43 public class TableInputFormat extends TableInputFormatBase
44 implements Configurable {
45
46 private final Log LOG = LogFactory.getLog(TableInputFormat.class);
47
48
49 public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
50
51
52
53
54 private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
55
56
57
58 public static final String SCAN = "hbase.mapreduce.scan";
59
60 public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
61
62 public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
63
64 public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
65
66 public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
67
68 public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
69
70 public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
71
72 public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
73
74 public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
75
76 public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
77
78 public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
79
80 public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
81
82
83 private Configuration conf = null;
84
85
86
87
88
89
90
91 @Override
92 public Configuration getConf() {
93 return conf;
94 }
95
96
97
98
99
100
101
102
103
104 @Override
105 public void setConf(Configuration configuration) {
106 this.conf = configuration;
107 String tableName = conf.get(INPUT_TABLE);
108 try {
109 setHTable(new HTable(new Configuration(conf), tableName));
110 } catch (Exception e) {
111 LOG.error(StringUtils.stringifyException(e));
112 }
113
114 Scan scan = null;
115
116 if (conf.get(SCAN) != null) {
117 try {
118 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
119 } catch (IOException e) {
120 LOG.error("An error occurred.", e);
121 }
122 } else {
123 try {
124 scan = new Scan();
125
126 if (conf.get(SCAN_ROW_START) != null) {
127 scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
128 }
129
130 if (conf.get(SCAN_ROW_STOP) != null) {
131 scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
132 }
133
134 if (conf.get(SCAN_COLUMNS) != null) {
135 addColumns(scan, conf.get(SCAN_COLUMNS));
136 }
137
138 if (conf.get(SCAN_COLUMN_FAMILY) != null) {
139 scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
140 }
141
142 if (conf.get(SCAN_TIMESTAMP) != null) {
143 scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
144 }
145
146 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
147 scan.setTimeRange(
148 Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
149 Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
150 }
151
152 if (conf.get(SCAN_MAXVERSIONS) != null) {
153 scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
154 }
155
156 if (conf.get(SCAN_CACHEDROWS) != null) {
157 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
158 }
159
160 if (conf.get(SCAN_BATCHSIZE) != null) {
161 scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
162 }
163
164
165 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
166 } catch (Exception e) {
167 LOG.error(StringUtils.stringifyException(e));
168 }
169 }
170
171 setScan(scan);
172 }
173
174
175
176
177
178
179
180
181
182
183
184 private static void addColumn(Scan scan, byte[] familyAndQualifier) {
185 byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
186 if (fq.length == 1) {
187 scan.addFamily(fq[0]);
188 } else if (fq.length == 2) {
189 scan.addColumn(fq[0], fq[1]);
190 } else {
191 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
192 }
193 }
194
195
196
197
198
199
200
201
202
203
204
205 public static void addColumns(Scan scan, byte [][] columns) {
206 for (byte[] column : columns) {
207 addColumn(scan, column);
208 }
209 }
210
211
212
213
214
215
216
217 private static void addColumns(Scan scan, String columns) {
218 String[] cols = columns.split(" ");
219 for (String col : cols) {
220 addColumn(scan, Bytes.toBytes(col));
221 }
222 }
223
224 @Override
225 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
226 if (conf.get(SPLIT_TABLE) != null) {
227 TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
228 HTable rl = new HTable(getConf(), splitTableName);
229 try {
230 return rl.getStartEndKeys();
231 } finally {
232 rl.close();
233 }
234 }
235
236 return super.getStartEndKeys();
237 }
238
239
240
241
242 public static void configureSplitTable(Job job, TableName tableName) {
243 job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
244 }
245 }