1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.filter.Filter;
31 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32 import org.apache.hadoop.hbase.regionserver.HRegion;
33 import org.apache.hadoop.mapred.InputFormat;
34 import org.apache.hadoop.mapred.InputSplit;
35 import org.apache.hadoop.mapred.JobConf;
36 import org.apache.hadoop.mapred.RecordReader;
37 import org.apache.hadoop.mapred.Reporter;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 @Deprecated
71 @InterfaceAudience.Public
72 @InterfaceStability.Stable
73 public abstract class TableInputFormatBase
74 implements InputFormat<ImmutableBytesWritable, Result> {
75 final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
76 private byte [][] inputColumns;
77 private HTable table;
78 private TableRecordReader tableRecordReader;
79 private Filter rowFilter;
80
81
82
83
84
85
86
87
88 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
89 InputSplit split, JobConf job, Reporter reporter)
90 throws IOException {
91 TableSplit tSplit = (TableSplit) split;
92 TableRecordReader trr = this.tableRecordReader;
93
94 if (trr == null) {
95 trr = new TableRecordReader();
96 }
97 trr.setStartRow(tSplit.getStartRow());
98 trr.setEndRow(tSplit.getEndRow());
99 trr.setHTable(this.table);
100 trr.setInputColumns(this.inputColumns);
101 trr.setRowFilter(this.rowFilter);
102 trr.init();
103 return trr;
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
124 if (this.table == null) {
125 throw new IOException("No table was provided");
126 }
127 byte [][] startKeys = this.table.getStartKeys();
128 if (startKeys == null || startKeys.length == 0) {
129 throw new IOException("Expecting at least one region");
130 }
131 if (this.inputColumns == null || this.inputColumns.length == 0) {
132 throw new IOException("Expecting at least one column");
133 }
134 int realNumSplits = numSplits > startKeys.length? startKeys.length:
135 numSplits;
136 InputSplit[] splits = new InputSplit[realNumSplits];
137 int middle = startKeys.length / realNumSplits;
138 int startPos = 0;
139 for (int i = 0; i < realNumSplits; i++) {
140 int lastPos = startPos + middle;
141 lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
142 String regionLocation = table.getRegionLocation(startKeys[startPos]).
143 getHostname();
144 splits[i] = new TableSplit(this.table.getName(),
145 startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
146 HConstants.EMPTY_START_ROW, regionLocation);
147 LOG.info("split: " + i + "->" + splits[i]);
148 startPos = lastPos;
149 }
150 return splits;
151 }
152
153
154
155
156 protected void setInputColumns(byte [][] inputColumns) {
157 this.inputColumns = inputColumns;
158 }
159
160
161
162
163 protected HTable getHTable() {
164 return this.table;
165 }
166
167
168
169
170
171
172 protected void setHTable(HTable table) {
173 this.table = table;
174 }
175
176
177
178
179
180
181
182 protected void setTableRecordReader(TableRecordReader tableRecordReader) {
183 this.tableRecordReader = tableRecordReader;
184 }
185
186
187
188
189
190
191 protected void setRowFilter(Filter rowFilter) {
192 this.rowFilter = rowFilter;
193 }
194 }