View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.classification.InterfaceStability;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.catalog.MetaReader;
28  import org.apache.hadoop.hbase.client.HConnection;
29  import org.apache.hadoop.hbase.client.HConnectionManager;
30  import org.apache.hadoop.hbase.client.Put;
31  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32  import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
33  import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
34  import org.apache.hadoop.hbase.security.User;
35  import org.apache.hadoop.hbase.security.UserProvider;
36  import org.apache.hadoop.hbase.security.token.TokenUtil;
37  import org.apache.hadoop.mapred.FileInputFormat;
38  import org.apache.hadoop.mapred.InputFormat;
39  import org.apache.hadoop.mapred.JobConf;
40  import org.apache.hadoop.mapred.OutputFormat;
41  import org.apache.hadoop.mapred.TextInputFormat;
42  import org.apache.hadoop.mapred.TextOutputFormat;
43  
44  /**
45   * Utility for {@link TableMap} and {@link TableReduce}
46   */
47  @Deprecated
48  @InterfaceAudience.Public
49  @InterfaceStability.Stable
50  @SuppressWarnings({ "rawtypes", "unchecked" })
51  public class TableMapReduceUtil {
52  
53    /**
54     * Use this before submitting a TableMap job. It will
55     * appropriately set up the JobConf.
56     *
57     * @param table  The table name to read from.
58     * @param columns  The columns to scan.
59     * @param mapper  The mapper class to use.
60     * @param outputKeyClass  The class of the output key.
61     * @param outputValueClass  The class of the output value.
62     * @param job  The current job configuration to adjust.
63     */
64    public static void initTableMapJob(String table, String columns,
65      Class<? extends TableMap> mapper,
66      Class<?> outputKeyClass,
67      Class<?> outputValueClass, JobConf job) {
68      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
69        true, TableInputFormat.class);
70    }
71  
72    public static void initTableMapJob(String table, String columns,
73      Class<? extends TableMap> mapper,
74      Class<?> outputKeyClass,
75      Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
76      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
77        addDependencyJars, TableInputFormat.class);
78    }
79  
80    /**
81     * Use this before submitting a TableMap job. It will
82     * appropriately set up the JobConf.
83     *
84     * @param table  The table name to read from.
85     * @param columns  The columns to scan.
86     * @param mapper  The mapper class to use.
87     * @param outputKeyClass  The class of the output key.
88     * @param outputValueClass  The class of the output value.
89     * @param job  The current job configuration to adjust.
90     * @param addDependencyJars upload HBase jars and jars for any of the configured
91     *           job classes via the distributed cache (tmpjars).
92     */
93    public static void initTableMapJob(String table, String columns,
94      Class<? extends TableMap> mapper,
95      Class<?> outputKeyClass,
96      Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
97      Class<? extends InputFormat> inputFormat) {
98  
99      job.setInputFormat(inputFormat);
100     job.setMapOutputValueClass(outputValueClass);
101     job.setMapOutputKeyClass(outputKeyClass);
102     job.setMapperClass(mapper);
103     job.setStrings("io.serializations", job.get("io.serializations"),
104         MutationSerialization.class.getName(), ResultSerialization.class.getName());
105     FileInputFormat.addInputPaths(job, table);
106     job.set(TableInputFormat.COLUMN_LIST, columns);
107     if (addDependencyJars) {
108       try {
109         addDependencyJars(job);
110       } catch (IOException e) {
111         e.printStackTrace();
112       }
113     }
114     try {
115       initCredentials(job);
116     } catch (IOException ioe) {
117       // just spit out the stack trace?  really?
118       ioe.printStackTrace();
119     }
120   }
121 
122   /**
123    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
124    * and read directly from snapshot files.
125    *
126    * @param snapshotName The name of the snapshot (of a table) to read from.
127    * @param columns  The columns to scan.
128    * @param mapper  The mapper class to use.
129    * @param outputKeyClass  The class of the output key.
130    * @param outputValueClass  The class of the output value.
131    * @param job  The current job to adjust.  Make sure the passed job is
132    * carrying all necessary HBase configuration.
133    * @param addDependencyJars upload HBase jars and jars for any of the configured
134    *           job classes via the distributed cache (tmpjars).
135    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
136    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
137    * After the job is finished, restore directory can be deleted.
138    * @throws IOException When setting up the details fails.
139    * @see TableSnapshotInputFormat
140    */
141   public static void initTableSnapshotMapJob(String snapshotName, String columns,
142       Class<? extends TableMap> mapper,
143       Class<?> outputKeyClass,
144       Class<?> outputValueClass, JobConf job,
145       boolean addDependencyJars, Path tmpRestoreDir)
146   throws IOException {
147     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
148     initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
149       addDependencyJars, TableSnapshotInputFormat.class);
150     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
151   }
152 
153   /**
154    * Use this before submitting a TableReduce job. It will
155    * appropriately set up the JobConf.
156    *
157    * @param table  The output table.
158    * @param reducer  The reducer class to use.
159    * @param job  The current job configuration to adjust.
160    * @throws IOException When determining the region count fails.
161    */
162   public static void initTableReduceJob(String table,
163     Class<? extends TableReduce> reducer, JobConf job)
164   throws IOException {
165     initTableReduceJob(table, reducer, job, null);
166   }
167 
168   /**
169    * Use this before submitting a TableReduce job. It will
170    * appropriately set up the JobConf.
171    *
172    * @param table  The output table.
173    * @param reducer  The reducer class to use.
174    * @param job  The current job configuration to adjust.
175    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
176    * default partitioner.
177    * @throws IOException When determining the region count fails.
178    */
179   public static void initTableReduceJob(String table,
180     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
181   throws IOException {
182     initTableReduceJob(table, reducer, job, partitioner, true);
183   }
184 
185   /**
186    * Use this before submitting a TableReduce job. It will
187    * appropriately set up the JobConf.
188    *
189    * @param table  The output table.
190    * @param reducer  The reducer class to use.
191    * @param job  The current job configuration to adjust.
192    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
193    * default partitioner.
194    * @param addDependencyJars upload HBase jars and jars for any of the configured
195    *           job classes via the distributed cache (tmpjars).
196    * @throws IOException When determining the region count fails.
197    */
198   public static void initTableReduceJob(String table,
199     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
200     boolean addDependencyJars) throws IOException {
201     job.setOutputFormat(TableOutputFormat.class);
202     job.setReducerClass(reducer);
203     job.set(TableOutputFormat.OUTPUT_TABLE, table);
204     job.setOutputKeyClass(ImmutableBytesWritable.class);
205     job.setOutputValueClass(Put.class);
206     job.setStrings("io.serializations", job.get("io.serializations"),
207         MutationSerialization.class.getName(), ResultSerialization.class.getName());
208     if (partitioner == HRegionPartitioner.class) {
209       job.setPartitionerClass(HRegionPartitioner.class);
210       int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
211       if (job.getNumReduceTasks() > regions) {
212         job.setNumReduceTasks(regions);
213       }
214     } else if (partitioner != null) {
215       job.setPartitionerClass(partitioner);
216     }
217     if (addDependencyJars) {
218       addDependencyJars(job);
219     }
220     initCredentials(job);
221   }
222 
223   public static void initCredentials(JobConf job) throws IOException {
224     UserProvider userProvider = UserProvider.instantiate(job);
225     if (userProvider.isHadoopSecurityEnabled()) {
226       // propagate delegation related props from launcher job to MR job
227       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
228         job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
229       }
230     }
231 
232     if (userProvider.isHBaseSecurityEnabled()) {
233       HConnection conn = HConnectionManager.createConnection(job);
234       try {
235         // login the server principal (if using secure Hadoop)
236         User user = userProvider.getCurrent();
237         TokenUtil.addTokenForJob(conn, job, user);
238       } catch (InterruptedException ie) {
239         ie.printStackTrace();
240         Thread.currentThread().interrupt();
241       } finally {
242         conn.close();
243       }
244     }
245   }
246 
247   /**
248    * Ensures that the given number of reduce tasks for the given job
249    * configuration does not exceed the number of regions for the given table.
250    *
251    * @param table  The table to get the region count for.
252    * @param job  The current job configuration to adjust.
253    * @throws IOException When retrieving the table details fails.
254    */
255   public static void limitNumReduceTasks(String table, JobConf job)
256   throws IOException {
257     int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
258     if (job.getNumReduceTasks() > regions)
259       job.setNumReduceTasks(regions);
260   }
261 
262   /**
263    * Ensures that the given number of map tasks for the given job
264    * configuration does not exceed the number of regions for the given table.
265    *
266    * @param table  The table to get the region count for.
267    * @param job  The current job configuration to adjust.
268    * @throws IOException When retrieving the table details fails.
269    */
270   public static void limitNumMapTasks(String table, JobConf job)
271   throws IOException {
272     int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
273     if (job.getNumMapTasks() > regions)
274       job.setNumMapTasks(regions);
275   }
276 
277   /**
278    * Sets the number of reduce tasks for the given job configuration to the
279    * number of regions the given table has.
280    *
281    * @param table  The table to get the region count for.
282    * @param job  The current job configuration to adjust.
283    * @throws IOException When retrieving the table details fails.
284    */
285   public static void setNumReduceTasks(String table, JobConf job)
286   throws IOException {
287     job.setNumReduceTasks(MetaReader.getRegionCount(HBaseConfiguration.create(job), table));
288   }
289 
290   /**
291    * Sets the number of map tasks for the given job configuration to the
292    * number of regions the given table has.
293    *
294    * @param table  The table to get the region count for.
295    * @param job  The current job configuration to adjust.
296    * @throws IOException When retrieving the table details fails.
297    */
298   public static void setNumMapTasks(String table, JobConf job)
299   throws IOException {
300     job.setNumMapTasks(MetaReader.getRegionCount(HBaseConfiguration.create(job), table));
301   }
302 
303   /**
304    * Sets the number of rows to return and cache with each scanner iteration.
305    * Higher caching values will enable faster mapreduce jobs at the expense of
306    * requiring more heap to contain the cached rows.
307    *
308    * @param job The current job configuration to adjust.
309    * @param batchSize The number of rows to return in batch with each scanner
310    * iteration.
311    */
312   public static void setScannerCaching(JobConf job, int batchSize) {
313     job.setInt("hbase.client.scanner.caching", batchSize);
314   }
315 
316   /**
317    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
318    */
319   public static void addDependencyJars(JobConf job) throws IOException {
320     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
321     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
322       job,
323       // when making changes here, consider also mapreduce.TableMapReduceUtil
324       // pull job classes
325       job.getMapOutputKeyClass(),
326       job.getMapOutputValueClass(),
327       job.getOutputKeyClass(),
328       job.getOutputValueClass(),
329       job.getPartitionerClass(),
330       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
331       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
332       job.getCombinerClass());
333   }
334 }