1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.lang.reflect.InvocationTargetException;
24 import java.lang.reflect.Method;
25 import java.net.URL;
26 import java.net.URLDecoder;
27 import java.util.ArrayList;
28 import java.util.Enumeration;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.zip.ZipEntry;
35 import java.util.zip.ZipFile;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.classification.InterfaceStability;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.hbase.HBaseConfiguration;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.catalog.MetaReader;
47 import org.apache.hadoop.hbase.client.HConnection;
48 import org.apache.hadoop.hbase.client.HConnectionManager;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.client.Scan;
51 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
52 import org.apache.hadoop.hbase.mapreduce.hadoopbackport.JarFinder;
53 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
54 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
55 import org.apache.hadoop.hbase.security.User;
56 import org.apache.hadoop.hbase.security.UserProvider;
57 import org.apache.hadoop.hbase.security.token.TokenUtil;
58 import org.apache.hadoop.hbase.util.Base64;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
61 import org.apache.hadoop.io.Writable;
62 import org.apache.hadoop.io.WritableComparable;
63 import org.apache.hadoop.mapreduce.InputFormat;
64 import org.apache.hadoop.mapreduce.Job;
65 import org.apache.hadoop.util.StringUtils;
66 import com.google.protobuf.InvalidProtocolBufferException;
67 import com.yammer.metrics.core.MetricsRegistry;
68
69
70
71
72 @SuppressWarnings({ "rawtypes", "unchecked" })
73 @InterfaceAudience.Public
74 @InterfaceStability.Stable
75 public class TableMapReduceUtil {
76 static Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 public static void initTableMapperJob(String table, Scan scan,
92 Class<? extends TableMapper> mapper,
93 Class<?> outputKeyClass,
94 Class<?> outputValueClass, Job job)
95 throws IOException {
96 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
97 job, true);
98 }
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 public static void initTableMapperJob(byte[] table, Scan scan,
114 Class<? extends TableMapper> mapper,
115 Class<?> outputKeyClass,
116 Class<?> outputValueClass, Job job)
117 throws IOException {
118 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
119 job, true);
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 public static void initTableMapperJob(String table, Scan scan,
138 Class<? extends TableMapper> mapper,
139 Class<?> outputKeyClass,
140 Class<?> outputValueClass, Job job,
141 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
142 throws IOException {
143 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
144 addDependencyJars, true, inputFormatClass);
145 }
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165 public static void initTableMapperJob(String table, Scan scan,
166 Class<? extends TableMapper> mapper,
167 Class<?> outputKeyClass,
168 Class<?> outputValueClass, Job job,
169 boolean addDependencyJars, boolean initCredentials,
170 Class<? extends InputFormat> inputFormatClass)
171 throws IOException {
172 job.setInputFormatClass(inputFormatClass);
173 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
174 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
175 job.setMapperClass(mapper);
176 if (Put.class.equals(outputValueClass)) {
177 job.setCombinerClass(PutCombiner.class);
178 }
179 Configuration conf = job.getConfiguration();
180 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
181 conf.set(TableInputFormat.INPUT_TABLE, table);
182 conf.set(TableInputFormat.SCAN, convertScanToString(scan));
183 conf.setStrings("io.serializations", conf.get("io.serializations"),
184 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
185 KeyValueSerialization.class.getName());
186 if (addDependencyJars) {
187 addDependencyJars(job);
188 }
189 if (initCredentials) {
190 initCredentials(job);
191 }
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210 public static void initTableMapperJob(byte[] table, Scan scan,
211 Class<? extends TableMapper> mapper,
212 Class<?> outputKeyClass,
213 Class<?> outputValueClass, Job job,
214 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
215 throws IOException {
216 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
217 outputValueClass, job, addDependencyJars, inputFormatClass);
218 }
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235 public static void initTableMapperJob(byte[] table, Scan scan,
236 Class<? extends TableMapper> mapper,
237 Class<?> outputKeyClass,
238 Class<?> outputValueClass, Job job,
239 boolean addDependencyJars)
240 throws IOException {
241 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
242 outputValueClass, job, addDependencyJars, TableInputFormat.class);
243 }
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 public static void initTableMapperJob(String table, Scan scan,
261 Class<? extends TableMapper> mapper,
262 Class<?> outputKeyClass,
263 Class<?> outputValueClass, Job job,
264 boolean addDependencyJars)
265 throws IOException {
266 initTableMapperJob(table, scan, mapper, outputKeyClass,
267 outputValueClass, job, addDependencyJars, TableInputFormat.class);
268 }
269
270
271
272
273
274
275
276 public static void resetCacheConfig(Configuration conf) {
277 conf.setFloat(
278 HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
279 conf.setFloat("hbase.offheapcache.percentage", 0f);
280 conf.setFloat("hbase.bucketcache.size", 0f);
281 conf.unset("hbase.bucketcache.ioengine");
282 }
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
305 Class<? extends TableMapper> mapper,
306 Class<?> outputKeyClass,
307 Class<?> outputValueClass, Job job,
308 boolean addDependencyJars, Path tmpRestoreDir)
309 throws IOException {
310 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
311 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
312 outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
313 addDependencyJars(job.getConfiguration(), MetricsRegistry.class);
314 resetCacheConfig(job.getConfiguration());
315 }
316
317
318
319
320
321
322
323
324
325
326
327
328
329 public static void initTableMapperJob(List<Scan> scans,
330 Class<? extends TableMapper> mapper,
331 Class<? extends WritableComparable> outputKeyClass,
332 Class<? extends Writable> outputValueClass, Job job) throws IOException {
333 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
334 true);
335 }
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351 public static void initTableMapperJob(List<Scan> scans,
352 Class<? extends TableMapper> mapper,
353 Class<? extends WritableComparable> outputKeyClass,
354 Class<? extends Writable> outputValueClass, Job job,
355 boolean addDependencyJars) throws IOException {
356 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
357 addDependencyJars, true);
358 }
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375 public static void initTableMapperJob(List<Scan> scans,
376 Class<? extends TableMapper> mapper,
377 Class<? extends WritableComparable> outputKeyClass,
378 Class<? extends Writable> outputValueClass, Job job,
379 boolean addDependencyJars,
380 boolean initCredentials) throws IOException {
381 job.setInputFormatClass(MultiTableInputFormat.class);
382 if (outputValueClass != null) {
383 job.setMapOutputValueClass(outputValueClass);
384 }
385 if (outputKeyClass != null) {
386 job.setMapOutputKeyClass(outputKeyClass);
387 }
388 job.setMapperClass(mapper);
389 Configuration conf = job.getConfiguration();
390 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
391 List<String> scanStrings = new ArrayList<String>();
392
393 for (Scan scan : scans) {
394 scanStrings.add(convertScanToString(scan));
395 }
396 job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
397 scanStrings.toArray(new String[scanStrings.size()]));
398
399 if (addDependencyJars) {
400 addDependencyJars(job);
401 }
402
403 if (initCredentials) {
404 initCredentials(job);
405 }
406 }
407
408 public static void initCredentials(Job job) throws IOException {
409 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
410 if (userProvider.isHadoopSecurityEnabled()) {
411
412 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
413 job.getConfiguration().set("mapreduce.job.credentials.binary",
414 System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
415 }
416 }
417
418 if (userProvider.isHBaseSecurityEnabled()) {
419 try {
420
421 String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
422 User user = userProvider.getCurrent();
423 if (quorumAddress != null) {
424 Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
425 ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
426 HConnection peerConn = HConnectionManager.createConnection(peerConf);
427 try {
428 TokenUtil.addTokenForJob(peerConn, user, job);
429 } finally {
430 peerConn.close();
431 }
432 }
433
434 HConnection conn = HConnectionManager.createConnection(job.getConfiguration());
435 try {
436 TokenUtil.addTokenForJob(conn, user, job);
437 } finally {
438 conn.close();
439 }
440 } catch (InterruptedException ie) {
441 LOG.info("Interrupted obtaining user authentication token");
442 Thread.currentThread().interrupt();
443 }
444 }
445 }
446
447
448
449
450
451
452
453
454
455
456
457
458 public static void initCredentialsForCluster(Job job, String quorumAddress)
459 throws IOException {
460 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
461 if (userProvider.isHBaseSecurityEnabled()) {
462 try {
463 Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
464 ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
465 HConnection peerConn = HConnectionManager.createConnection(peerConf);
466 try {
467 TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
468 } finally {
469 peerConn.close();
470 }
471 } catch (InterruptedException e) {
472 LOG.info("Interrupted obtaining user authentication token");
473 Thread.interrupted();
474 }
475 }
476 }
477
478
479
480
481
482
483
484
485 static String convertScanToString(Scan scan) throws IOException {
486 ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
487 return Base64.encodeBytes(proto.toByteArray());
488 }
489
490
491
492
493
494
495
496
497 static Scan convertStringToScan(String base64) throws IOException {
498 byte [] decoded = Base64.decode(base64);
499 ClientProtos.Scan scan;
500 try {
501 scan = ClientProtos.Scan.parseFrom(decoded);
502 } catch (InvalidProtocolBufferException ipbe) {
503 throw new IOException(ipbe);
504 }
505
506 return ProtobufUtil.toScan(scan);
507 }
508
509
510
511
512
513
514
515
516
517
518 public static void initTableReducerJob(String table,
519 Class<? extends TableReducer> reducer, Job job)
520 throws IOException {
521 initTableReducerJob(table, reducer, job, null);
522 }
523
524
525
526
527
528
529
530
531
532
533
534
535 public static void initTableReducerJob(String table,
536 Class<? extends TableReducer> reducer, Job job,
537 Class partitioner) throws IOException {
538 initTableReducerJob(table, reducer, job, partitioner, null, null, null);
539 }
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564 public static void initTableReducerJob(String table,
565 Class<? extends TableReducer> reducer, Job job,
566 Class partitioner, String quorumAddress, String serverClass,
567 String serverImpl) throws IOException {
568 initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
569 serverClass, serverImpl, true);
570 }
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597 public static void initTableReducerJob(String table,
598 Class<? extends TableReducer> reducer, Job job,
599 Class partitioner, String quorumAddress, String serverClass,
600 String serverImpl, boolean addDependencyJars) throws IOException {
601
602 Configuration conf = job.getConfiguration();
603 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
604 job.setOutputFormatClass(TableOutputFormat.class);
605 if (reducer != null) job.setReducerClass(reducer);
606 conf.set(TableOutputFormat.OUTPUT_TABLE, table);
607 conf.setStrings("io.serializations", conf.get("io.serializations"),
608 MutationSerialization.class.getName(), ResultSerialization.class.getName());
609
610 if (quorumAddress != null) {
611
612 ZKUtil.transformClusterKey(quorumAddress);
613 conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
614 }
615 if (serverClass != null && serverImpl != null) {
616 conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
617 conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
618 }
619 job.setOutputKeyClass(ImmutableBytesWritable.class);
620 job.setOutputValueClass(Writable.class);
621 if (partitioner == HRegionPartitioner.class) {
622 job.setPartitionerClass(HRegionPartitioner.class);
623 int regions = MetaReader.getRegionCount(conf, table);
624 if (job.getNumReduceTasks() > regions) {
625 job.setNumReduceTasks(regions);
626 }
627 } else if (partitioner != null) {
628 job.setPartitionerClass(partitioner);
629 }
630
631 if (addDependencyJars) {
632 addDependencyJars(job);
633 }
634
635 initCredentials(job);
636 }
637
638
639
640
641
642
643
644
645
646 public static void limitNumReduceTasks(String table, Job job)
647 throws IOException {
648 int regions = MetaReader.getRegionCount(job.getConfiguration(), table);
649 if (job.getNumReduceTasks() > regions)
650 job.setNumReduceTasks(regions);
651 }
652
653
654
655
656
657
658
659
660
661 public static void setNumReduceTasks(String table, Job job)
662 throws IOException {
663 job.setNumReduceTasks(MetaReader.getRegionCount(job.getConfiguration(), table));
664 }
665
666
667
668
669
670
671
672
673
674
675 public static void setScannerCaching(Job job, int batchSize) {
676 job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
677 }
678
679
680
681
682
683
684
685
686
687
688
689
690
691 public static void addHBaseDependencyJars(Configuration conf) throws IOException {
692 addDependencyJars(conf,
693
694 org.apache.hadoop.hbase.HConstants.class,
695 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class,
696 org.apache.hadoop.hbase.client.Put.class,
697 org.apache.hadoop.hbase.CompatibilityFactory.class,
698 org.apache.hadoop.hbase.mapreduce.TableMapper.class,
699
700 org.apache.zookeeper.ZooKeeper.class,
701 org.jboss.netty.channel.ChannelFactory.class,
702 com.google.protobuf.Message.class,
703 com.google.common.collect.Lists.class,
704 org.cloudera.htrace.Trace.class,
705 org.cliffc.high_scale_lib.Counter.class);
706 }
707
708
709
710
711
712 public static String buildDependencyClasspath(Configuration conf) {
713 if (conf == null) {
714 throw new IllegalArgumentException("Must provide a configuration object.");
715 }
716 Set<String> paths = new HashSet<String>(conf.getStringCollection("tmpjars"));
717 if (paths.size() == 0) {
718 throw new IllegalArgumentException("Configuration contains no tmpjars.");
719 }
720 StringBuilder sb = new StringBuilder();
721 for (String s : paths) {
722
723 int idx = s.indexOf(":");
724 if (idx != -1) s = s.substring(idx + 1);
725 if (sb.length() > 0) sb.append(File.pathSeparator);
726 sb.append(s);
727 }
728 return sb.toString();
729 }
730
731
732
733
734
735
736 public static void addDependencyJars(Job job) throws IOException {
737 addHBaseDependencyJars(job.getConfiguration());
738 try {
739 addDependencyJars(job.getConfiguration(),
740
741
742 job.getMapOutputKeyClass(),
743 job.getMapOutputValueClass(),
744 job.getInputFormatClass(),
745 job.getOutputKeyClass(),
746 job.getOutputValueClass(),
747 job.getOutputFormatClass(),
748 job.getPartitionerClass(),
749 job.getCombinerClass());
750 } catch (ClassNotFoundException e) {
751 throw new IOException(e);
752 }
753 }
754
755
756
757
758
759
760 public static void addDependencyJars(Configuration conf,
761 Class<?>... classes) throws IOException {
762
763 FileSystem localFs = FileSystem.getLocal(conf);
764 Set<String> jars = new HashSet<String>();
765
766 jars.addAll(conf.getStringCollection("tmpjars"));
767
768
769
770 Map<String, String> packagedClasses = new HashMap<String, String>();
771
772
773 for (Class<?> clazz : classes) {
774 if (clazz == null) continue;
775
776 Path path = findOrCreateJar(clazz, localFs, packagedClasses);
777 if (path == null) {
778 LOG.warn("Could not find jar for class " + clazz +
779 " in order to ship it to the cluster.");
780 continue;
781 }
782 if (!localFs.exists(path)) {
783 LOG.warn("Could not validate jar file " + path + " for class "
784 + clazz);
785 continue;
786 }
787 jars.add(path.toString());
788 }
789 if (jars.isEmpty()) return;
790
791 conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
792 }
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808 private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
809 Map<String, String> packagedClasses)
810 throws IOException {
811
812 String jar = findContainingJar(my_class, packagedClasses);
813 if (null == jar || jar.isEmpty()) {
814 jar = getJar(my_class);
815 updateMap(jar, packagedClasses);
816 }
817
818 if (null == jar || jar.isEmpty()) {
819 return null;
820 }
821
822 LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
823 return new Path(jar).makeQualified(fs);
824 }
825
826
827
828
829
830
831
832 private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
833 if (null == jar || jar.isEmpty()) {
834 return;
835 }
836 ZipFile zip = null;
837 try {
838 zip = new ZipFile(jar);
839 for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
840 ZipEntry entry = iter.nextElement();
841 if (entry.getName().endsWith("class")) {
842 packagedClasses.put(entry.getName(), jar);
843 }
844 }
845 } finally {
846 if (null != zip) zip.close();
847 }
848 }
849
850
851
852
853
854
855
856
857
858
859 private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
860 throws IOException {
861 ClassLoader loader = my_class.getClassLoader();
862
863 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
864
865 if (loader != null) {
866
867 for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
868 URL url = itr.nextElement();
869 if ("jar".equals(url.getProtocol())) {
870 String toReturn = url.getPath();
871 if (toReturn.startsWith("file:")) {
872 toReturn = toReturn.substring("file:".length());
873 }
874
875
876
877
878
879
880 toReturn = toReturn.replaceAll("\\+", "%2B");
881 toReturn = URLDecoder.decode(toReturn, "UTF-8");
882 return toReturn.replaceAll("!.*$", "");
883 }
884 }
885 }
886
887
888
889 return packagedClasses.get(class_file);
890 }
891
892
893
894
895
896
897
898
899 private static String getJar(Class<?> my_class) {
900 String ret = null;
901 String hadoopJarFinder = "org.apache.hadoop.util.JarFinder";
902 Class<?> jarFinder = null;
903 try {
904 LOG.debug("Looking for " + hadoopJarFinder + ".");
905 jarFinder = Class.forName(hadoopJarFinder);
906 LOG.debug(hadoopJarFinder + " found.");
907 Method getJar = jarFinder.getMethod("getJar", Class.class);
908 ret = (String) getJar.invoke(null, my_class);
909 } catch (ClassNotFoundException e) {
910 LOG.debug("Using backported JarFinder.");
911 ret = JarFinder.getJar(my_class);
912 } catch (InvocationTargetException e) {
913
914
915 throw new RuntimeException(e.getCause());
916 } catch (Exception e) {
917
918 throw new RuntimeException("getJar invocation failed.", e);
919 }
920
921 return ret;
922 }
923 }