View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Constructor;
22  import java.lang.reflect.InvocationTargetException;
23  import java.lang.reflect.Method;
24  
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.mapred.JobConf;
27  import org.apache.hadoop.mapred.MiniMRCluster;
28  import org.apache.hadoop.mapreduce.Job;
29  import org.apache.hadoop.mapreduce.JobContext;
30  import org.apache.hadoop.mapreduce.JobID;
31  
32  /**
33   * This class provides shims for HBase to interact with the Hadoop 1.0.x and the
34   * Hadoop 0.23.x series.
35   * 
36   * NOTE: No testing done against 0.22.x, or 0.21.x.
37   */
38  abstract public class MapreduceTestingShim {
39    private static MapreduceTestingShim instance;
40    private static Class[] emptyParam = new Class[] {};
41  
42    static {
43      try {
44        // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x
45        Class c = Class
46            .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
47        instance = new MapreduceV2Shim();
48      } catch (Exception e) {
49        instance = new MapreduceV1Shim();
50      }
51    }
52  
53    abstract public JobContext newJobContext(Configuration jobConf)
54        throws IOException;
55  
56    abstract public Job newJob(Configuration conf) throws IOException;
57    
58    abstract public JobConf obtainJobConf(MiniMRCluster cluster);
59  
60    abstract public String obtainMROutputDirProp();
61    
62    public static JobContext createJobContext(Configuration jobConf)
63        throws IOException {
64      return instance.newJobContext(jobConf);
65    }
66    
67    public static JobConf getJobConf(MiniMRCluster cluster) {
68      return instance.obtainJobConf(cluster);
69    }
70  
71    public static Job createJob(Configuration conf) throws IOException {
72      return instance.newJob(conf);
73    }
74  
75    public static String getMROutputDirProp() {
76      return instance.obtainMROutputDirProp();
77    }
78    
79    private static class MapreduceV1Shim extends MapreduceTestingShim {
80      public JobContext newJobContext(Configuration jobConf) throws IOException {
81        // Implementing:
82        // return new JobContext(jobConf, new JobID());
83        JobID jobId = new JobID();
84        Constructor<JobContext> c;
85        try {
86          c = JobContext.class.getConstructor(Configuration.class, JobID.class);
87          return c.newInstance(jobConf, jobId);
88        } catch (Exception e) {
89          throw new IllegalStateException(
90              "Failed to instantiate new JobContext(jobConf, new JobID())", e);
91        }
92      }
93  
94      @Override
95      public Job newJob(Configuration conf) throws IOException {
96        // Implementing:
97        // return new Job(conf);
98        Constructor<Job> c;
99        try {
100         c = Job.class.getConstructor(Configuration.class);
101         return c.newInstance(conf);
102       } catch (Exception e) {
103         throw new IllegalStateException(
104             "Failed to instantiate new Job(conf)", e);
105       }
106     }
107     
108     public JobConf obtainJobConf(MiniMRCluster cluster) {
109       if (cluster == null) return null;
110       try {
111         Object runner = cluster.getJobTrackerRunner();
112         Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
113         Object tracker = meth.invoke(runner, new Object []{});
114         Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
115         return (JobConf) m.invoke(tracker, new Object []{});
116       } catch (NoSuchMethodException nsme) {
117         return null;
118       } catch (InvocationTargetException ite) {
119         return null;
120       } catch (IllegalAccessException iae) {
121         return null;
122       }
123     }
124 
125     @Override
126     public String obtainMROutputDirProp() {
127       return "mapred.output.dir";
128     }
129   };
130 
131   private static class MapreduceV2Shim extends MapreduceTestingShim {
132     public JobContext newJobContext(Configuration jobConf) {
133       return newJob(jobConf);
134     }
135 
136     @Override
137     public Job newJob(Configuration jobConf) {
138       // Implementing:
139       // return Job.getInstance(jobConf);
140       try {
141         Method m = Job.class.getMethod("getInstance", Configuration.class);
142         return (Job) m.invoke(null, jobConf); // static method, then arg
143       } catch (Exception e) {
144         e.printStackTrace();
145         throw new IllegalStateException(
146             "Failed to return from Job.getInstance(jobConf)");
147       }
148     }
149     
150     public JobConf obtainJobConf(MiniMRCluster cluster) {
151       try {
152         Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
153         return (JobConf) meth.invoke(cluster, new Object []{});
154       } catch (NoSuchMethodException nsme) {
155         return null;
156       } catch (InvocationTargetException ite) {
157         return null;
158       } catch (IllegalAccessException iae) {
159         return null;
160       }
161     }
162 
163     @Override
164     public String obtainMROutputDirProp() {
165       // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR 
166       // from Hadoop 0.23.x.  If we use the source directly we break the hadoop 1.x compile. 
167       return "mapreduce.output.fileoutputformat.outputdir";
168     }
169   };
170 
171 }