View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.lang.reflect.Method;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.classification.InterfaceStability;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.client.TableSnapshotScanner;
38  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
39  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
41  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
42  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
43  import org.apache.hadoop.hbase.regionserver.HRegion;
44  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
45  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
46  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
47  import org.apache.hadoop.hbase.util.FSUtils;
48  import org.apache.hadoop.io.Writable;
49  import org.apache.hadoop.mapreduce.InputFormat;
50  import org.apache.hadoop.mapreduce.InputSplit;
51  import org.apache.hadoop.mapreduce.Job;
52  import org.apache.hadoop.mapreduce.JobContext;
53  import org.apache.hadoop.mapreduce.RecordReader;
54  import org.apache.hadoop.mapreduce.TaskAttemptContext;
55  
56  import com.google.common.annotations.VisibleForTesting;
57  
58  /**
59   * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
60   * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
61   * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be
62   * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
63   * online or offline hbase cluster. The snapshot files can be exported by using the
64   * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to
65   * run the mapreduce job directly over the snapshot files. The snapshot should not be deleted
66   * while there are jobs reading from snapshot files.
67   * <p>
68   * Usage is similar to TableInputFormat, and
69   * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job,
70   *   boolean, Path)}
71   * can be used to configure the job.
72   * <pre>{@code
73   * Job job = new Job(conf);
74   * Scan scan = new Scan();
75   * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
76   *      scan, MyTableMapper.class, MyMapKeyOutput.class,
77   *      MyMapOutputValueWritable.class, job, true);
78   * }
79   * </pre>
80   * <p>
81   * Internally, this input format restores the snapshot into the given tmp directory. Similar to
82   * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
83   * from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained
84   * from the user.
85   * <p>
86   * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
87   * snapshot files and data files.
88   * To read from snapshot files directly from the file system, the user who is running the MR job
89   * must have sufficient permissions to access snapshot and reference files.
90   * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
91   * user or the user must have group or other privileges in the filesystem (See HBASE-8369).
92   * Note that, given other users access to read from snapshot/data files will completely circumvent
93   * the access control enforced by HBase.
94   * @see TableSnapshotScanner
95   */
96  @InterfaceAudience.Public
97  @InterfaceStability.Evolving
98  public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
99  
100   private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
101 
102   public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
103     private TableSnapshotInputFormatImpl.InputSplit delegate;
104 
105     public TableSnapshotRegionSplit() {
106       this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
107     }
108 
109     public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
110       this.delegate = delegate;
111     }
112 
113     public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
114         List<String> locations) {
115       this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
116     }
117 
118     @Override
119     public long getLength() throws IOException, InterruptedException {
120       return delegate.getLength();
121     }
122 
123     @Override
124     public String[] getLocations() throws IOException, InterruptedException {
125       return delegate.getLocations();
126     }
127 
128     @Override
129     public void write(DataOutput out) throws IOException {
130       delegate.write(out);
131     }
132 
133     @Override
134     public void readFields(DataInput in) throws IOException {
135       delegate.readFields(in);
136     }
137   }
138 
139   @VisibleForTesting
140   static class TableSnapshotRegionRecordReader extends RecordReader<ImmutableBytesWritable, Result> {
141     private TableSnapshotInputFormatImpl.RecordReader delegate =
142       new TableSnapshotInputFormatImpl.RecordReader();
143     private TaskAttemptContext context;
144     private Method getCounter;
145 
146     @Override
147     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
148       InterruptedException {
149       this.context = context;
150       getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
151       delegate.initialize(
152         ((TableSnapshotRegionSplit) split).delegate,
153         context.getConfiguration());
154     }
155 
156     @Override
157     public boolean nextKeyValue() throws IOException, InterruptedException {
158       boolean result = delegate.nextKeyValue();
159       if (result) {
160         ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
161         if (scanMetrics != null && context != null) {
162           TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
163         }
164       }
165 
166       return result;
167     }
168 
169     @Override
170     public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
171       return delegate.getCurrentKey();
172     }
173 
174     @Override
175     public Result getCurrentValue() throws IOException, InterruptedException {
176       return delegate.getCurrentValue();
177     }
178 
179     @Override
180     public float getProgress() throws IOException, InterruptedException {
181       return delegate.getProgress();
182     }
183 
184     @Override
185     public void close() throws IOException {
186       delegate.close();
187     }
188   }
189 
190   @Override
191   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
192     InputSplit split, TaskAttemptContext context) throws IOException {
193     return new TableSnapshotRegionRecordReader();
194   }
195 
196   @Override
197   public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
198     List<InputSplit> results = new ArrayList<InputSplit>();
199     for (TableSnapshotInputFormatImpl.InputSplit split :
200       TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
201       results.add(new TableSnapshotRegionSplit(split));
202     }
203     return results;
204   }
205 
206   /**
207    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
208    * @param job the job to configure
209    * @param snapshotName the name of the snapshot to read from
210    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
211    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
212    * After the job is finished, restoreDir can be deleted.
213    * @throws IOException if an error occurs
214    */
215   public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
216     TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
217   }
218 }