1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.net.InetSocketAddress;
24 import java.net.UnknownHostException;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28
29 import javax.naming.NamingException;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.classification.InterfaceStability;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.HRegionLocation;
38 import org.apache.hadoop.hbase.client.HTable;
39 import org.apache.hadoop.hbase.client.Result;
40 import org.apache.hadoop.hbase.client.Scan;
41 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42 import org.apache.hadoop.hbase.util.Addressing;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.Pair;
45 import org.apache.hadoop.hbase.util.RegionSizeCalculator;
46 import org.apache.hadoop.hbase.util.Strings;
47 import org.apache.hadoop.mapreduce.InputFormat;
48 import org.apache.hadoop.mapreduce.InputSplit;
49 import org.apache.hadoop.mapreduce.JobContext;
50 import org.apache.hadoop.mapreduce.RecordReader;
51 import org.apache.hadoop.mapreduce.TaskAttemptContext;
52 import org.apache.hadoop.net.DNS;
53 import org.apache.hadoop.util.StringUtils;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 @InterfaceAudience.Public
89 @InterfaceStability.Stable
90 public abstract class TableInputFormatBase
91 extends InputFormat<ImmutableBytesWritable, Result> {
92
93
94 public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
95
96
97 public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" +
98 ".maxskewratio";
99
100
101 public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
102
103 final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
104
105
106 private Scan scan = null;
107
108 private HTable table = null;
109
110 private TableRecordReader tableRecordReader = null;
111
112
113 private HashMap<InetAddress, String> reverseDNSCacheMap =
114 new HashMap<InetAddress, String>();
115
116
117 private String nameServer = null;
118
119
120
121
122
123
124
125
126
127
128
129
130
131 @Override
132 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
133 InputSplit split, TaskAttemptContext context)
134 throws IOException {
135 if (table == null) {
136 throw new IOException("Cannot create a record reader because of a" +
137 " previous error. Please look at the previous logs lines from" +
138 " the task's full log for more details.");
139 }
140 TableSplit tSplit = (TableSplit) split;
141 LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
142 TableRecordReader trr = this.tableRecordReader;
143
144 if (trr == null) {
145 trr = new TableRecordReader();
146 }
147 Scan sc = new Scan(this.scan);
148 sc.setStartRow(tSplit.getStartRow());
149 sc.setStopRow(tSplit.getEndRow());
150 trr.setScan(sc);
151 trr.setHTable(table);
152 return trr;
153 }
154
155 protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
156 return table.getStartEndKeys();
157 }
158
159
160
161
162
163
164
165
166
167
168
169 @Override
170 public List<InputSplit> getSplits(JobContext context) throws IOException {
171 if (table == null) {
172 throw new IOException("No table was provided.");
173 }
174
175 this.nameServer =
176 context.getConfiguration().get("hbase.nameserver.address", null);
177
178 RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table);
179
180
181 Pair<byte[][], byte[][]> keys = getStartEndKeys();
182 if (keys == null || keys.getFirst() == null ||
183 keys.getFirst().length == 0) {
184 HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
185 if (null == regLoc) {
186 throw new IOException("Expecting at least one region.");
187 }
188 List<InputSplit> splits = new ArrayList<InputSplit>(1);
189 long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
190 TableSplit split = new TableSplit(table.getName(),
191 HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
192 .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
193 splits.add(split);
194 return splits;
195 }
196 List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
197 for (int i = 0; i < keys.getFirst().length; i++) {
198 if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
199 continue;
200 }
201 HRegionLocation location = table.getRegionLocation(keys.getFirst()[i], false);
202
203 InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
204 if (isa.isUnresolved()) {
205 LOG.warn("Failed resolve " + isa);
206 }
207 InetAddress regionAddress = isa.getAddress();
208 String regionLocation;
209 try {
210 regionLocation = reverseDNS(regionAddress);
211 } catch (NamingException e) {
212 LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e);
213 regionLocation = location.getHostname();
214 }
215
216 byte[] startRow = scan.getStartRow();
217 byte[] stopRow = scan.getStopRow();
218
219 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
220 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
221 (stopRow.length == 0 ||
222 Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
223 byte[] splitStart = startRow.length == 0 ||
224 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
225 keys.getFirst()[i] : startRow;
226 byte[] splitStop = (stopRow.length == 0 ||
227 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
228 keys.getSecond()[i].length > 0 ?
229 keys.getSecond()[i] : stopRow;
230
231 byte[] regionName = location.getRegionInfo().getRegionName();
232 long regionSize = sizeCalculator.getRegionSize(regionName);
233 TableSplit split = new TableSplit(table.getName(),
234 splitStart, splitStop, regionLocation, regionSize);
235 splits.add(split);
236 if (LOG.isDebugEnabled()) {
237 LOG.debug("getSplits: split -> " + i + " -> " + split);
238 }
239 }
240 }
241
242 boolean enableAutoBalance = context.getConfiguration().getBoolean(
243 MAPREDUCE_INPUT_AUTOBALANCE, false);
244 if (enableAutoBalance) {
245 long totalRegionSize=0;
246 for (int i = 0; i < splits.size(); i++){
247 TableSplit ts = (TableSplit)splits.get(i);
248 totalRegionSize += ts.getLength();
249 }
250 long averageRegionSize = totalRegionSize / splits.size();
251
252 if (averageRegionSize <= 0) {
253 LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
254 "set it to 1.");
255 averageRegionSize = 1;
256 }
257 return calculateRebalancedSplits(splits, context, averageRegionSize);
258 } else {
259 return splits;
260 }
261 }
262
263 public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException {
264 String hostName = this.reverseDNSCacheMap.get(ipAddress);
265 if (hostName == null) {
266 String ipAddressString = null;
267 try {
268 ipAddressString = DNS.reverseDns(ipAddress, null);
269 } catch (Exception e) {
270
271
272
273 ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
274 }
275 if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
276 hostName = Strings.domainNamePointerToHostName(ipAddressString);
277 this.reverseDNSCacheMap.put(ipAddress, hostName);
278 }
279 return hostName;
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295 public List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
296 long average) throws IOException {
297 List<InputSplit> resultList = new ArrayList<InputSplit>();
298 Configuration conf = context.getConfiguration();
299
300 long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
301
302 boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
303 long dataSkewThreshold = dataSkewRatio * average;
304 int count = 0;
305 while (count < list.size()) {
306 TableSplit ts = (TableSplit)list.get(count);
307 String regionLocation = ts.getRegionLocation();
308 long regionSize = ts.getLength();
309 if (regionSize >= dataSkewThreshold) {
310
311
312 byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
313
314
315 TableSplit t1 = new TableSplit(table.getName(), ts.getStartRow(), splitKey, regionLocation,
316 regionSize / 2);
317 TableSplit t2 = new TableSplit(table.getName(), splitKey, ts.getEndRow(), regionLocation,
318 regionSize - regionSize / 2);
319 resultList.add(t1);
320 resultList.add(t2);
321 count++;
322 } else if (regionSize >= average) {
323
324
325 resultList.add(ts);
326 count++;
327 } else {
328
329
330 long totalSize = regionSize;
331 byte[] splitStartKey = ts.getStartRow();
332 byte[] splitEndKey = ts.getEndRow();
333 count++;
334 for (; count < list.size(); count++) {
335 TableSplit nextRegion = (TableSplit)list.get(count);
336 long nextRegionSize = nextRegion.getLength();
337 if (totalSize + nextRegionSize <= dataSkewThreshold) {
338 totalSize = totalSize + nextRegionSize;
339 splitEndKey = nextRegion.getEndRow();
340 } else {
341 break;
342 }
343 }
344 TableSplit t = new TableSplit(table.getName(), splitStartKey, splitEndKey,
345 regionLocation, totalSize);
346 resultList.add(t);
347 }
348 }
349 return resultList;
350 }
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368 public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
369 byte upperLimitByte;
370 byte lowerLimitByte;
371
372 if (isText) {
373
374
375 upperLimitByte = '~';
376 lowerLimitByte = ' ';
377 } else {
378 upperLimitByte = Byte.MAX_VALUE;
379 lowerLimitByte = Byte.MIN_VALUE;
380 }
381
382
383
384 if (start.length == 0 && end.length == 0){
385 return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
386 }
387 if (start.length == 0 && end.length != 0){
388 return new byte[]{ end[0] };
389 }
390 if (start.length != 0 && end.length == 0){
391 byte[] result =new byte[start.length];
392 result[0]=start[0];
393 for (int k = 1; k < start.length; k++){
394 result[k] = upperLimitByte;
395 }
396 return result;
397 }
398
399 List<Byte> resultBytesList = new ArrayList<Byte>();
400 int maxLength = start.length > end.length ? start.length : end.length;
401 for (int i = 0; i < maxLength; i++) {
402
403
404
405
406 if (start[i] == end[i]) {
407 resultBytesList.add(start[i]);
408
409 if (i + 1 == start.length) {
410 resultBytesList.add((byte) ((lowerLimitByte + end[i + 1]) / 2));
411 break;
412 }
413 } else {
414
415
416 if ((int)end[i] - (int)start[i] == 1) {
417
418 byte startNextByte = (i + 1 < start.length) ? start[i + 1] : lowerLimitByte;
419 byte endNextByte = (i + 1 < end.length) ? end[i + 1] : lowerLimitByte;
420 int byteRange = (upperLimitByte - startNextByte) + (endNextByte - lowerLimitByte) + 1;
421 int halfRange = byteRange / 2;
422 if ((int)startNextByte + halfRange > (int)upperLimitByte) {
423 resultBytesList.add(end[i]);
424 resultBytesList.add((byte) (startNextByte + halfRange - upperLimitByte +
425 lowerLimitByte));
426 } else {
427 resultBytesList.add(start[i]);
428 resultBytesList.add((byte) (startNextByte + halfRange));
429 }
430 } else {
431
432
433 resultBytesList.add((byte) ((start[i] + end[i]) / 2));
434 }
435 break;
436 }
437 }
438
439 byte result[] = new byte[resultBytesList.size()];
440 for (int k = 0; k < resultBytesList.size(); k++) {
441 result[k] = (byte) resultBytesList.get(k);
442 }
443 return result;
444 }
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470 protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
471 return true;
472 }
473
474
475
476
477 protected HTable getHTable() {
478 return this.table;
479 }
480
481
482
483
484
485
486 protected void setHTable(HTable table) {
487 this.table = table;
488 }
489
490
491
492
493
494
495 public Scan getScan() {
496 if (this.scan == null) this.scan = new Scan();
497 return scan;
498 }
499
500
501
502
503
504
505 public void setScan(Scan scan) {
506 this.scan = scan;
507 }
508
509
510
511
512
513
514
515 protected void setTableRecordReader(TableRecordReader tableRecordReader) {
516 this.tableRecordReader = tableRecordReader;
517 }
518 }