1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import org.apache.hadoop.hbase.classification.InterfaceAudience;
22 import org.apache.hadoop.hbase.classification.InterfaceStability;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.FileSystem;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.hbase.CellUtil;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
29 import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.HTableDescriptor;
32 import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
33 import org.apache.hadoop.hbase.client.IsolationLevel;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
38 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
39 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
40 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos;
41 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
42 import org.apache.hadoop.hbase.regionserver.HRegion;
43 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
44 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
45 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
46 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
47 import org.apache.hadoop.hbase.util.ByteStringer;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.FSTableDescriptors;
50 import org.apache.hadoop.hbase.util.FSUtils;
51 import org.apache.hadoop.io.Writable;
52 import org.apache.hadoop.mapreduce.Job;
53
54 import java.io.ByteArrayOutputStream;
55 import java.io.DataInput;
56 import java.io.DataOutput;
57 import java.io.IOException;
58 import java.util.ArrayList;
59 import java.util.List;
60 import java.util.Set;
61 import java.util.UUID;
62
63
64
65
66 @InterfaceAudience.Private
67 @InterfaceStability.Evolving
68 public class TableSnapshotInputFormatImpl {
69
70
71
72 private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
73
74 private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
75
76
77 private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
78 private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
79
80
81
82
83 public static class InputSplit implements Writable {
84 private HTableDescriptor htd;
85 private HRegionInfo regionInfo;
86 private String[] locations;
87
88
89 public InputSplit() { }
90
91 public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations) {
92 this.htd = htd;
93 this.regionInfo = regionInfo;
94 if (locations == null || locations.isEmpty()) {
95 this.locations = new String[0];
96 } else {
97 this.locations = locations.toArray(new String[locations.size()]);
98 }
99 }
100
101 public long getLength() {
102
103 return 0;
104 }
105
106 public String[] getLocations() {
107 return locations;
108 }
109
110 public HTableDescriptor getTableDescriptor() {
111 return htd;
112 }
113
114 public HRegionInfo getRegionInfo() {
115 return regionInfo;
116 }
117
118
119
120 @Override
121 public void write(DataOutput out) throws IOException {
122 MapReduceProtos.TableSnapshotRegionSplit.Builder builder = MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
123 .setTable(htd.convert())
124 .setRegion(HRegionInfo.convert(regionInfo));
125
126 for (String location : locations) {
127 builder.addLocations(location);
128 }
129
130 MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
131
132 ByteArrayOutputStream baos = new ByteArrayOutputStream();
133 split.writeTo(baos);
134 baos.close();
135 byte[] buf = baos.toByteArray();
136 out.writeInt(buf.length);
137 out.write(buf);
138 }
139
140 @Override
141 public void readFields(DataInput in) throws IOException {
142 int len = in.readInt();
143 byte[] buf = new byte[len];
144 in.readFully(buf);
145 TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
146 this.htd = HTableDescriptor.convert(split.getTable());
147 this.regionInfo = HRegionInfo.convert(split.getRegion());
148 List<String> locationsList = split.getLocationsList();
149 this.locations = locationsList.toArray(new String[locationsList.size()]);
150 }
151 }
152
153
154
155
156 public static class RecordReader {
157 InputSplit split;
158 private Scan scan;
159 private Result result = null;
160 private ImmutableBytesWritable row = null;
161 private ClientSideRegionScanner scanner;
162
163 public ClientSideRegionScanner getScanner() {
164 return scanner;
165 }
166
167 public void initialize(InputSplit split, Configuration conf) throws IOException {
168 this.split = split;
169 HTableDescriptor htd = split.htd;
170 HRegionInfo hri = this.split.getRegionInfo();
171 FileSystem fs = FSUtils.getCurrentFileSystem(conf);
172
173 Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY));
174
175
176
177
178 if (conf.get(TableInputFormat.SCAN) != null) {
179 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
180 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
181 String[] columns =
182 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
183 scan = new Scan();
184 for (String col : columns) {
185 scan.addFamily(Bytes.toBytes(col));
186 }
187 } else {
188 throw new IllegalArgumentException("A Scan is not configured for this job");
189 }
190
191
192
193 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
194
195 scan.setCacheBlocks(false);
196
197 scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
198 }
199
200 public boolean nextKeyValue() throws IOException {
201 result = scanner.next();
202 if (result == null) {
203
204 return false;
205 }
206
207 if (this.row == null) {
208 this.row = new ImmutableBytesWritable();
209 }
210 this.row.set(result.getRow());
211 return true;
212 }
213
214 public ImmutableBytesWritable getCurrentKey() {
215 return row;
216 }
217
218 public Result getCurrentValue() {
219 return result;
220 }
221
222 public long getPos() {
223 return 0;
224 }
225
226 public float getProgress() {
227 return 0;
228 }
229
230 public void close() {
231 if (this.scanner != null) {
232 this.scanner.close();
233 }
234 }
235 }
236
237 public static List<InputSplit> getSplits(Configuration conf) throws IOException {
238 String snapshotName = getSnapshotName(conf);
239
240 Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
241 FileSystem fs = rootDir.getFileSystem(conf);
242
243 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
244 HBaseProtos.SnapshotDescription snapshotDesc =
245 SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
246 SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
247
248 List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
249
250 if (regionManifests == null) {
251 throw new IllegalArgumentException("Snapshot seems empty");
252 }
253
254
255 HTableDescriptor htd = manifest.getTableDescriptor();
256
257
258 Scan scan = null;
259 if (conf.get(TableInputFormat.SCAN) != null) {
260 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
261 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
262 String[] columns =
263 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
264 scan = new Scan();
265 for (String col : columns) {
266 scan.addFamily(Bytes.toBytes(col));
267 }
268 } else {
269 throw new IllegalArgumentException("Unable to create scan");
270 }
271
272 Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
273 Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
274
275 List<InputSplit> splits = new ArrayList<InputSplit>();
276 for (SnapshotRegionManifest regionManifest : regionManifests) {
277
278 HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
279
280 if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
281 hri.getStartKey(), hri.getEndKey())) {
282
283
284 List<String> hosts = getBestLocations(conf,
285 HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
286
287 int len = Math.min(3, hosts.size());
288 hosts = hosts.subList(0, len);
289 splits.add(new InputSplit(htd, hri, hosts));
290 }
291 }
292
293 return splits;
294 }
295
296
297
298
299
300
301
302
303
304
305
306
307 public static List<String> getBestLocations(
308 Configuration conf, HDFSBlocksDistribution blockDistribution) {
309 List<String> locations = new ArrayList<String>(3);
310
311 HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
312
313 if (hostAndWeights.length == 0) {
314 return locations;
315 }
316
317 HostAndWeight topHost = hostAndWeights[0];
318 locations.add(topHost.getHost());
319
320
321 double cutoffMultiplier
322 = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
323
324 double filterWeight = topHost.getWeight() * cutoffMultiplier;
325
326 for (int i = 1; i < hostAndWeights.length; i++) {
327 if (hostAndWeights[i].getWeight() >= filterWeight) {
328 locations.add(hostAndWeights[i].getHost());
329 } else {
330 break;
331 }
332 }
333
334 return locations;
335 }
336
337 private static String getSnapshotName(Configuration conf) {
338 String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
339 if (snapshotName == null) {
340 throw new IllegalArgumentException("Snapshot name must be provided");
341 }
342 return snapshotName;
343 }
344
345
346
347
348
349
350
351
352
353
354 public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
355 throws IOException {
356 conf.set(SNAPSHOT_NAME_KEY, snapshotName);
357
358 Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
359 FileSystem fs = rootDir.getFileSystem(conf);
360
361 restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
362
363
364 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
365
366 conf.set(RESTORE_DIR_KEY, restoreDir.toString());
367 }
368 }