1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.lang.reflect.Method;
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.client.HTable;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.ResultScanner;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.client.ScannerCallable;
34 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.mapreduce.Counter;
39 import org.apache.hadoop.mapreduce.InputSplit;
40 import org.apache.hadoop.mapreduce.TaskAttemptContext;
41 import org.apache.hadoop.util.StringUtils;
42
43
44
45
46
47 @InterfaceAudience.Public
48 @InterfaceStability.Stable
49 public class TableRecordReaderImpl {
50 public static final String LOG_PER_ROW_COUNT
51 = "hbase.mapreduce.log.scanner.rowcount";
52
53 static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
54
55
56 private static final String HBASE_COUNTER_GROUP_NAME =
57 "HBase Counters";
58 private ResultScanner scanner = null;
59 private Scan scan = null;
60 private Scan currentScan = null;
61 private HTable htable = null;
62 private byte[] lastSuccessfulRow = null;
63 private ImmutableBytesWritable key = null;
64 private Result value = null;
65 private TaskAttemptContext context = null;
66 private Method getCounter = null;
67 private long numRestarts = 0;
68 private long timestamp;
69 private int rowcount;
70 private boolean logScannerActivity = false;
71 private int logPerRowCount = 100;
72
73
74
75
76
77
78
79 public void restart(byte[] firstRow) throws IOException {
80 currentScan = new Scan(scan);
81 currentScan.setStartRow(firstRow);
82 currentScan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
83 Bytes.toBytes(Boolean.TRUE));
84 if (this.scanner != null) {
85 if (logScannerActivity) {
86 LOG.info("Closing the previously opened scanner object.");
87 }
88 this.scanner.close();
89 }
90 this.scanner = this.htable.getScanner(currentScan);
91 if (logScannerActivity) {
92 LOG.info("Current scan=" + currentScan.toString());
93 timestamp = System.currentTimeMillis();
94 rowcount = 0;
95 }
96 }
97
98
99
100
101
102
103
104 protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
105 throws IOException {
106 Method m = null;
107 try {
108 m = context.getClass().getMethod("getCounter",
109 new Class [] {String.class, String.class});
110 } catch (SecurityException e) {
111 throw new IOException("Failed test for getCounter", e);
112 } catch (NoSuchMethodException e) {
113
114 }
115 return m;
116 }
117
118
119
120
121
122
123 public void setHTable(HTable htable) {
124 Configuration conf = htable.getConfiguration();
125 logScannerActivity = conf.getBoolean(
126 ScannerCallable.LOG_SCANNER_ACTIVITY, false);
127 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
128 this.htable = htable;
129 }
130
131
132
133
134
135
136 public void setScan(Scan scan) {
137 this.scan = scan;
138 }
139
140
141
142
143
144
145 public void initialize(InputSplit inputsplit,
146 TaskAttemptContext context) throws IOException,
147 InterruptedException {
148 if (context != null) {
149 this.context = context;
150 getCounter = retrieveGetCounterWithStringsParams(context);
151 }
152 restart(scan.getStartRow());
153 }
154
155
156
157
158
159
160 public void close() {
161 this.scanner.close();
162 try {
163 this.htable.close();
164 } catch (IOException ioe) {
165 LOG.warn("Error closing table", ioe);
166 }
167 }
168
169
170
171
172
173
174
175
176 public ImmutableBytesWritable getCurrentKey() throws IOException,
177 InterruptedException {
178 return key;
179 }
180
181
182
183
184
185
186
187
188 public Result getCurrentValue() throws IOException, InterruptedException {
189 return value;
190 }
191
192
193
194
195
196
197
198
199
200 public boolean nextKeyValue() throws IOException, InterruptedException {
201 if (key == null) key = new ImmutableBytesWritable();
202 if (value == null) value = new Result();
203 try {
204 try {
205 value = this.scanner.next();
206 if (logScannerActivity) {
207 rowcount ++;
208 if (rowcount >= logPerRowCount) {
209 long now = System.currentTimeMillis();
210 LOG.info("Mapper took " + (now-timestamp)
211 + "ms to process " + rowcount + " rows");
212 timestamp = now;
213 rowcount = 0;
214 }
215 }
216 } catch (IOException e) {
217
218
219 LOG.info("recovered from " + StringUtils.stringifyException(e));
220 if (lastSuccessfulRow == null) {
221 LOG.warn("We are restarting the first next() invocation," +
222 " if your mapper has restarted a few other times like this" +
223 " then you should consider killing this job and investigate" +
224 " why it's taking so long.");
225 }
226 if (lastSuccessfulRow == null) {
227 restart(scan.getStartRow());
228 } else {
229 restart(lastSuccessfulRow);
230 scanner.next();
231 }
232 value = scanner.next();
233 numRestarts++;
234 }
235 if (value != null && value.size() > 0) {
236 key.set(value.getRow());
237 lastSuccessfulRow = key.get();
238 return true;
239 }
240
241 updateCounters();
242 return false;
243 } catch (IOException ioe) {
244 if (logScannerActivity) {
245 long now = System.currentTimeMillis();
246 LOG.info("Mapper took " + (now-timestamp)
247 + "ms to process " + rowcount + " rows");
248 LOG.info(ioe);
249 String lastRow = lastSuccessfulRow == null ?
250 "null" : Bytes.toStringBinary(lastSuccessfulRow);
251 LOG.info("lastSuccessfulRow=" + lastRow);
252 }
253 throw ioe;
254 }
255 }
256
257
258
259
260
261
262
263
264 private void updateCounters() throws IOException {
265 byte[] serializedMetrics = currentScan.getAttribute(
266 Scan.SCAN_ATTRIBUTES_METRICS_DATA);
267 if (serializedMetrics == null || serializedMetrics.length == 0 ) {
268 return;
269 }
270
271 ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);
272
273 updateCounters(scanMetrics, numRestarts, getCounter, context);
274 }
275
276 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
277 Method getCounter, TaskAttemptContext context) {
278
279 if (getCounter == null) {
280 return;
281 }
282
283 try {
284 for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
285 Counter ct = (Counter)getCounter.invoke(context,
286 HBASE_COUNTER_GROUP_NAME, entry.getKey());
287
288 ct.increment(entry.getValue());
289 }
290 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
291 "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
292 } catch (Exception e) {
293 LOG.debug("can't update counter." + StringUtils.stringifyException(e));
294 }
295 }
296
297
298
299
300
301
302 public float getProgress() {
303
304 return 0;
305 }
306
307 }