1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.text.ParseException;
22 import java.text.SimpleDateFormat;
23 import java.util.Map;
24 import java.util.TreeMap;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.conf.Configured;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.client.Delete;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.client.Mutation;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
40 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.mapreduce.Job;
43 import org.apache.hadoop.mapreduce.Mapper;
44 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
45 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
46 import org.apache.hadoop.util.GenericOptionsParser;
47 import org.apache.hadoop.util.Tool;
48 import org.apache.hadoop.util.ToolRunner;
49
50
51
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Public
61 @InterfaceStability.Stable
62 public class WALPlayer extends Configured implements Tool {
63 final static String NAME = "WALPlayer";
64 final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output";
65 final static String HLOG_INPUT_KEY = "hlog.input.dir";
66 final static String TABLES_KEY = "hlog.input.tables";
67 final static String TABLE_MAP_KEY = "hlog.input.tablesmap";
68
69
70
71
72
73 static class HLogKeyValueMapper
74 extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue> {
75 private byte[] table;
76
77 @Override
78 public void map(HLogKey key, WALEdit value,
79 Context context)
80 throws IOException {
81 try {
82
83 if (Bytes.equals(table, key.getTablename().getName())) {
84 for (KeyValue kv : value.getKeyValues()) {
85 if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
86 context.write(new ImmutableBytesWritable(kv.getRow()), kv);
87 }
88 }
89 } catch (InterruptedException e) {
90 e.printStackTrace();
91 }
92 }
93
94 @Override
95 public void setup(Context context) throws IOException {
96
97 String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
98 if (tables == null || tables.length != 1) {
99
100 throw new IOException("Exactly one table must be specified for bulk HFile case.");
101 }
102 table = Bytes.toBytes(tables[0]);
103 }
104 }
105
106
107
108
109
110 static class HLogMapper
111 extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, Mutation> {
112 private Map<TableName, TableName> tables =
113 new TreeMap<TableName, TableName>();
114
115 @Override
116 public void map(HLogKey key, WALEdit value,
117 Context context)
118 throws IOException {
119 try {
120 if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
121 TableName targetTable = tables.isEmpty() ?
122 key.getTablename() :
123 tables.get(key.getTablename());
124 ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
125 Put put = null;
126 Delete del = null;
127 KeyValue lastKV = null;
128 for (KeyValue kv : value.getKeyValues()) {
129
130 if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
131
132
133
134
135
136 if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
137
138 if (put != null) context.write(tableOut, put);
139 if (del != null) context.write(tableOut, del);
140
141 if (kv.isDelete()) {
142 del = new Delete(kv.getRow());
143 } else {
144 put = new Put(kv.getRow());
145 }
146 }
147 if (kv.isDelete()) {
148 del.addDeleteMarker(kv);
149 } else {
150 put.add(kv);
151 }
152 lastKV = kv;
153 }
154
155 if (put != null) context.write(tableOut, put);
156 if (del != null) context.write(tableOut, del);
157 }
158 } catch (InterruptedException e) {
159 e.printStackTrace();
160 }
161 }
162
163 @Override
164 public void setup(Context context) throws IOException {
165 String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
166 String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
167 if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
168
169 throw new IOException("No tables or incorrect table mapping specified.");
170 }
171 int i = 0;
172 for (String table : tablesToUse) {
173 tables.put(TableName.valueOf(table),
174 TableName.valueOf(tableMap[i++]));
175 }
176 }
177 }
178
179
180
181
182 public WALPlayer(Configuration conf) {
183 super(conf);
184 }
185
186 void setupTime(Configuration conf, String option) throws IOException {
187 String val = conf.get(option);
188 if (val == null) return;
189 long ms;
190 try {
191
192 ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
193 } catch (ParseException pe) {
194 try {
195
196 ms = Long.parseLong(val);
197 } catch (NumberFormatException nfe) {
198 throw new IOException(option
199 + " must be specified either in the form 2001-02-20T16:35:06.99 "
200 + "or as number of milliseconds");
201 }
202 }
203 conf.setLong(option, ms);
204 }
205
206
207
208
209
210
211
212
213 public Job createSubmittableJob(String[] args)
214 throws IOException {
215 Configuration conf = getConf();
216 setupTime(conf, HLogInputFormat.START_TIME_KEY);
217 setupTime(conf, HLogInputFormat.END_TIME_KEY);
218 Path inputDir = new Path(args[0]);
219 String[] tables = args[1].split(",");
220 String[] tableMap;
221 if (args.length > 2) {
222 tableMap = args[2].split(",");
223 if (tableMap.length != tables.length) {
224 throw new IOException("The same number of tables and mapping must be provided.");
225 }
226 } else {
227
228 tableMap = tables;
229 }
230 conf.setStrings(TABLES_KEY, tables);
231 conf.setStrings(TABLE_MAP_KEY, tableMap);
232 Job job = new Job(conf, NAME + "_" + inputDir);
233 job.setJarByClass(WALPlayer.class);
234 FileInputFormat.setInputPaths(job, inputDir);
235 job.setInputFormatClass(HLogInputFormat.class);
236 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
237 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
238 if (hfileOutPath != null) {
239
240 if (tables.length != 1) {
241 throw new IOException("Exactly one table must be specified for the bulk export option");
242 }
243 HTable table = new HTable(conf, tables[0]);
244 job.setMapperClass(HLogKeyValueMapper.class);
245 job.setReducerClass(KeyValueSortReducer.class);
246 Path outputDir = new Path(hfileOutPath);
247 FileOutputFormat.setOutputPath(job, outputDir);
248 job.setMapOutputValueClass(KeyValue.class);
249 HFileOutputFormat.configureIncrementalLoad(job, table);
250 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
251 com.google.common.base.Preconditions.class);
252 } else {
253
254 job.setMapperClass(HLogMapper.class);
255 job.setOutputFormatClass(MultiTableOutputFormat.class);
256 TableMapReduceUtil.addDependencyJars(job);
257 TableMapReduceUtil.initCredentials(job);
258
259 job.setNumReduceTasks(0);
260 }
261 return job;
262 }
263
264
265
266
267 private void usage(final String errorMsg) {
268 if (errorMsg != null && errorMsg.length() > 0) {
269 System.err.println("ERROR: " + errorMsg);
270 }
271 System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
272 System.err.println("Read all WAL entries for <tables>.");
273 System.err.println("If no tables (\"\") are specific, all tables are imported.");
274 System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)");
275 System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
276 System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
277 System.err.println("<tableMapping> is a command separated list of targettables.");
278 System.err.println("If specified, each table in <tables> must have a mapping.\n");
279 System.err.println("By default " + NAME + " will load data directly into HBase.");
280 System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
281 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
282 System.err.println(" (Only one table can be specified, and no mapping is allowed!)");
283 System.err.println("Other options: (specify time range to WAL edit to consider)");
284 System.err.println(" -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]");
285 System.err.println(" -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]");
286 System.err.println("For performance also consider the following options:\n"
287 + " -Dmapred.map.tasks.speculative.execution=false\n"
288 + " -Dmapred.reduce.tasks.speculative.execution=false");
289 }
290
291
292
293
294
295
296
297 public static void main(String[] args) throws Exception {
298 int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
299 System.exit(ret);
300 }
301
302 @Override
303 public int run(String[] args) throws Exception {
304 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
305 if (otherArgs.length < 2) {
306 usage("Wrong number of arguments: " + otherArgs.length);
307 System.exit(-1);
308 }
309 Job job = createSubmittableJob(otherArgs);
310 return job.waitForCompletion(true) ? 0 : 1;
311 }
312 }