View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.io.UnsupportedEncodingException;
22  import java.net.URLDecoder;
23  import java.net.URLEncoder;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  import java.util.UUID;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.classification.InterfaceStability;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.KeyValueUtil;
46  import org.apache.hadoop.hbase.client.HTable;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
49  import org.apache.hadoop.hbase.io.compress.Compression;
50  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
51  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
52  import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
53  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
54  import org.apache.hadoop.hbase.io.hfile.HFileContext;
55  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
56  import org.apache.hadoop.hbase.regionserver.BloomType;
57  import org.apache.hadoop.hbase.regionserver.HStore;
58  import org.apache.hadoop.hbase.regionserver.StoreFile;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.io.NullWritable;
61  import org.apache.hadoop.io.SequenceFile;
62  import org.apache.hadoop.io.Text;
63  import org.apache.hadoop.mapreduce.Job;
64  import org.apache.hadoop.mapreduce.OutputFormat;
65  import org.apache.hadoop.mapreduce.RecordWriter;
66  import org.apache.hadoop.mapreduce.TaskAttemptContext;
67  import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
68  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
69  import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
70  
71  import com.google.common.annotations.VisibleForTesting;
72  
73  /**
74   * Writes HFiles. Passed Cells must arrive in order.
75   * Writes current time as the sequence id for the file. Sets the major compacted
76   * attribute on created hfiles. Calling write(null,null) will forcibly roll
77   * all HFiles being written.
78   * <p>
79   * Using this class as part of a MapReduce job is best done
80   * using {@link #configureIncrementalLoad(Job, HTable)}.
81   */
82  @InterfaceAudience.Public
83  @InterfaceStability.Evolving
84  public class HFileOutputFormat2
85      extends FileOutputFormat<ImmutableBytesWritable, Cell> {
86    static Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
87  
88    // The following constants are private since these are used by
89    // HFileOutputFormat2 to internally transfer data between job setup and
90    // reducer run using conf.
91    // These should not be changed by the client.
92    private static final String COMPRESSION_FAMILIES_CONF_KEY =
93        "hbase.hfileoutputformat.families.compression";
94    private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
95        "hbase.hfileoutputformat.families.bloomtype";
96    private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
97        "hbase.mapreduce.hfileoutputformat.blocksize";
98    private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
99        "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
100 
101   // This constant is public since the client can modify this when setting
102   // up their conf object and thus refer to this symbol.
103   // It is present for backwards compatibility reasons. Use it only to
104   // override the auto-detection of datablock encoding.
105   public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
106       "hbase.mapreduce.hfileoutputformat.datablock.encoding";
107 
108   @Override
109   public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
110       final TaskAttemptContext context) throws IOException, InterruptedException {
111     return createRecordWriter(context);
112   }
113 
114   static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
115       createRecordWriter(final TaskAttemptContext context)
116           throws IOException, InterruptedException {
117 
118     // Get the path of the temporary output file
119     final Path outputPath = FileOutputFormat.getOutputPath(context);
120     final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
121     final Configuration conf = context.getConfiguration();
122     final FileSystem fs = outputdir.getFileSystem(conf);
123     // These configs. are from hbase-*.xml
124     final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
125         HConstants.DEFAULT_MAX_FILE_SIZE);
126     // Invented config.  Add to hbase-*.xml if other than default compression.
127     final String defaultCompressionStr = conf.get("hfile.compression",
128         Compression.Algorithm.NONE.getName());
129     final Algorithm defaultCompression = AbstractHFileWriter
130         .compressionByName(defaultCompressionStr);
131     final boolean compactionExclude = conf.getBoolean(
132         "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
133 
134     // create a map from column family to the compression algorithm
135     final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
136     final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
137     final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
138 
139     String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
140     final Map<byte[], DataBlockEncoding> datablockEncodingMap
141         = createFamilyDataBlockEncodingMap(conf);
142     final DataBlockEncoding overriddenEncoding;
143     if (dataBlockEncodingStr != null) {
144       overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
145     } else {
146       overriddenEncoding = null;
147     }
148 
149     return new RecordWriter<ImmutableBytesWritable, V>() {
150       // Map of families to writers and how much has been output on the writer.
151       private final Map<byte [], WriterLength> writers =
152         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
153       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
154       private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
155       private boolean rollRequested = false;
156 
157       @Override
158       public void write(ImmutableBytesWritable row, V cell)
159           throws IOException {
160         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
161 
162         // null input == user explicitly wants to flush
163         if (row == null && kv == null) {
164           rollWriters();
165           return;
166         }
167 
168         byte [] rowKey = CellUtil.cloneRow(kv);
169         long length = kv.getLength();
170         byte [] family = CellUtil.cloneFamily(kv);
171         WriterLength wl = this.writers.get(family);
172 
173         // If this is a new column family, verify that the directory exists
174         if (wl == null) {
175           fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
176         }
177 
178         // If any of the HFiles for the column families has reached
179         // maxsize, we need to roll all the writers
180         if (wl != null && wl.written + length >= maxsize) {
181           this.rollRequested = true;
182         }
183 
184         // This can only happen once a row is finished though
185         if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
186           rollWriters();
187         }
188 
189         // create a new HLog writer, if necessary
190         if (wl == null || wl.writer == null) {
191           wl = getNewWriter(family, conf);
192         }
193 
194         // we now have the proper HLog writer. full steam ahead
195         kv.updateLatestStamp(this.now);
196         wl.writer.append(kv);
197         wl.written += length;
198 
199         // Copy the row so we know when a row transition.
200         this.previousRow = rowKey;
201       }
202 
203       private void rollWriters() throws IOException {
204         for (WriterLength wl : this.writers.values()) {
205           if (wl.writer != null) {
206             LOG.info("Writer=" + wl.writer.getPath() +
207                 ((wl.written == 0)? "": ", wrote=" + wl.written));
208             close(wl.writer);
209           }
210           wl.writer = null;
211           wl.written = 0;
212         }
213         this.rollRequested = false;
214       }
215 
216       /* Create a new StoreFile.Writer.
217        * @param family
218        * @return A WriterLength, containing a new StoreFile.Writer.
219        * @throws IOException
220        */
221       private WriterLength getNewWriter(byte[] family, Configuration conf)
222           throws IOException {
223         WriterLength wl = new WriterLength();
224         Path familydir = new Path(outputdir, Bytes.toString(family));
225         Algorithm compression = compressionMap.get(family);
226         compression = compression == null ? defaultCompression : compression;
227         BloomType bloomType = bloomTypeMap.get(family);
228         bloomType = bloomType == null ? BloomType.NONE : bloomType;
229         Integer blockSize = blockSizeMap.get(family);
230         blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
231         DataBlockEncoding encoding = overriddenEncoding;
232         encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
233         encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
234         Configuration tempConf = new Configuration(conf);
235         tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
236         HFileContextBuilder contextBuilder = new HFileContextBuilder()
237                                     .withCompression(compression)
238                                     .withChecksumType(HStore.getChecksumType(conf))
239                                     .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
240                                     .withBlockSize(blockSize);
241         contextBuilder.withDataBlockEncoding(encoding);
242         HFileContext hFileContext = contextBuilder.build();
243                                     
244         wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
245             .withOutputDir(familydir).withBloomType(bloomType)
246             .withComparator(KeyValue.COMPARATOR)
247             .withFileContext(hFileContext).build();
248 
249         this.writers.put(family, wl);
250         return wl;
251       }
252 
253       private void close(final StoreFile.Writer w) throws IOException {
254         if (w != null) {
255           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
256               Bytes.toBytes(System.currentTimeMillis()));
257           w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
258               Bytes.toBytes(context.getTaskAttemptID().toString()));
259           w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
260               Bytes.toBytes(true));
261           w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
262               Bytes.toBytes(compactionExclude));
263           w.appendTrackedTimestampsToMetadata();
264           w.close();
265         }
266       }
267 
268       @Override
269       public void close(TaskAttemptContext c)
270       throws IOException, InterruptedException {
271         for (WriterLength wl: this.writers.values()) {
272           close(wl.writer);
273         }
274       }
275     };
276   }
277 
278   /*
279    * Data structure to hold a Writer and amount of data written on it.
280    */
281   static class WriterLength {
282     long written = 0;
283     StoreFile.Writer writer = null;
284   }
285 
286   /**
287    * Return the start keys of all of the regions in this table,
288    * as a list of ImmutableBytesWritable.
289    */
290   private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
291   throws IOException {
292     byte[][] byteKeys = table.getStartKeys();
293     ArrayList<ImmutableBytesWritable> ret =
294       new ArrayList<ImmutableBytesWritable>(byteKeys.length);
295     for (byte[] byteKey : byteKeys) {
296       ret.add(new ImmutableBytesWritable(byteKey));
297     }
298     return ret;
299   }
300 
301   /**
302    * Write out a {@link SequenceFile} that can be read by
303    * {@link TotalOrderPartitioner} that contains the split points in startKeys.
304    */
305   @SuppressWarnings("deprecation")
306   private static void writePartitions(Configuration conf, Path partitionsPath,
307       List<ImmutableBytesWritable> startKeys) throws IOException {
308     LOG.info("Writing partition information to " + partitionsPath);
309     if (startKeys.isEmpty()) {
310       throw new IllegalArgumentException("No regions passed");
311     }
312 
313     // We're generating a list of split points, and we don't ever
314     // have keys < the first region (which has an empty start key)
315     // so we need to remove it. Otherwise we would end up with an
316     // empty reducer with index 0
317     TreeSet<ImmutableBytesWritable> sorted =
318       new TreeSet<ImmutableBytesWritable>(startKeys);
319 
320     ImmutableBytesWritable first = sorted.first();
321     if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
322       throw new IllegalArgumentException(
323           "First region of table should have empty start key. Instead has: "
324           + Bytes.toStringBinary(first.get()));
325     }
326     sorted.remove(first);
327 
328     // Write the actual file
329     FileSystem fs = partitionsPath.getFileSystem(conf);
330     SequenceFile.Writer writer = SequenceFile.createWriter(
331       fs, conf, partitionsPath, ImmutableBytesWritable.class,
332       NullWritable.class);
333 
334     try {
335       for (ImmutableBytesWritable startKey : sorted) {
336         writer.append(startKey, NullWritable.get());
337       }
338     } finally {
339       writer.close();
340     }
341   }
342 
343   /**
344    * Configure a MapReduce Job to perform an incremental load into the given
345    * table. This
346    * <ul>
347    *   <li>Inspects the table to configure a total order partitioner</li>
348    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
349    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
350    *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
351    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
352    *     PutSortReducer)</li>
353    * </ul>
354    * The user should be sure to set the map output value class to either KeyValue or Put before
355    * running this function.
356    */
357   public static void configureIncrementalLoad(Job job, HTable table)
358       throws IOException {
359     configureIncrementalLoad(job, table, HFileOutputFormat2.class);
360   }
361 
362   static void configureIncrementalLoad(Job job, HTable table,
363       Class<? extends OutputFormat<?, ?>> cls) throws IOException {
364     Configuration conf = job.getConfiguration();
365 
366     job.setOutputKeyClass(ImmutableBytesWritable.class);
367     job.setOutputValueClass(KeyValue.class);
368     job.setOutputFormatClass(cls);
369 
370     // Based on the configured map output class, set the correct reducer to properly
371     // sort the incoming values.
372     // TODO it would be nice to pick one or the other of these formats.
373     if (KeyValue.class.equals(job.getMapOutputValueClass())) {
374       job.setReducerClass(KeyValueSortReducer.class);
375     } else if (Put.class.equals(job.getMapOutputValueClass())) {
376       job.setReducerClass(PutSortReducer.class);
377     } else if (Text.class.equals(job.getMapOutputValueClass())) {
378       job.setReducerClass(TextSortReducer.class);
379     } else {
380       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
381     }
382 
383     conf.setStrings("io.serializations", conf.get("io.serializations"),
384         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
385         KeyValueSerialization.class.getName());
386 
387     // Use table's region boundaries for TOP split points.
388     LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
389     List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
390     LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
391         "to match current region count");
392     job.setNumReduceTasks(startKeys.size());
393 
394     configurePartitioner(job, startKeys);
395     // Set compression algorithms based on column families
396     configureCompression(table, conf);
397     configureBloomType(table, conf);
398     configureBlockSize(table, conf);
399     configureDataBlockEncoding(table, conf);
400 
401     TableMapReduceUtil.addDependencyJars(job);
402     TableMapReduceUtil.initCredentials(job);
403     LOG.info("Incremental table " + Bytes.toString(table.getTableName())
404       + " output configured.");
405   }
406   
407   public static void configureIncrementalLoadMap(Job job, HTable table) throws IOException {
408     Configuration conf = job.getConfiguration();
409 
410     job.setOutputKeyClass(ImmutableBytesWritable.class);
411     job.setOutputValueClass(KeyValue.class);
412     job.setOutputFormatClass(HFileOutputFormat2.class);
413 
414     // Set compression algorithms based on column families
415     configureCompression(table, conf);
416     configureBloomType(table, conf);
417     configureBlockSize(table, conf);
418     configureDataBlockEncoding(table, conf);
419 
420     TableMapReduceUtil.addDependencyJars(job);
421     TableMapReduceUtil.initCredentials(job);
422     LOG.info("Incremental table " + table.getName() + " output configured.");
423   }
424 
425   /**
426    * Runs inside the task to deserialize column family to compression algorithm
427    * map from the configuration.
428    *
429    * @param conf to read the serialized values from
430    * @return a map from column family to the configured compression algorithm
431    */
432   @VisibleForTesting
433   static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
434       conf) {
435     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
436         COMPRESSION_FAMILIES_CONF_KEY);
437     Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
438         Algorithm>(Bytes.BYTES_COMPARATOR);
439     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
440       Algorithm algorithm = AbstractHFileWriter.compressionByName
441           (e.getValue());
442       compressionMap.put(e.getKey(), algorithm);
443     }
444     return compressionMap;
445   }
446 
447   /**
448    * Runs inside the task to deserialize column family to bloom filter type
449    * map from the configuration.
450    *
451    * @param conf to read the serialized values from
452    * @return a map from column family to the the configured bloom filter type
453    */
454   @VisibleForTesting
455   static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
456     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
457         BLOOM_TYPE_FAMILIES_CONF_KEY);
458     Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
459         BloomType>(Bytes.BYTES_COMPARATOR);
460     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
461       BloomType bloomType = BloomType.valueOf(e.getValue());
462       bloomTypeMap.put(e.getKey(), bloomType);
463     }
464     return bloomTypeMap;
465   }
466 
467   /**
468    * Runs inside the task to deserialize column family to block size
469    * map from the configuration.
470    *
471    * @param conf to read the serialized values from
472    * @return a map from column family to the configured block size
473    */
474   @VisibleForTesting
475   static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
476     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
477         BLOCK_SIZE_FAMILIES_CONF_KEY);
478     Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
479         Integer>(Bytes.BYTES_COMPARATOR);
480     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
481       Integer blockSize = Integer.parseInt(e.getValue());
482       blockSizeMap.put(e.getKey(), blockSize);
483     }
484     return blockSizeMap;
485   }
486 
487   /**
488    * Runs inside the task to deserialize column family to data block encoding
489    * type map from the configuration.
490    *
491    * @param conf to read the serialized values from
492    * @return a map from column family to HFileDataBlockEncoder for the
493    *         configured data block type for the family
494    */
495   @VisibleForTesting
496   static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
497       Configuration conf) {
498     Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
499         DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
500     Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
501         DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
502     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
503       encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
504     }
505     return encoderMap;
506   }
507 
508 
509   /**
510    * Run inside the task to deserialize column family to given conf value map.
511    *
512    * @param conf to read the serialized values from
513    * @param confName conf key to read from the configuration
514    * @return a map of column family to the given configuration value
515    */
516   private static Map<byte[], String> createFamilyConfValueMap(
517       Configuration conf, String confName) {
518     Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
519     String confVal = conf.get(confName, "");
520     for (String familyConf : confVal.split("&")) {
521       String[] familySplit = familyConf.split("=");
522       if (familySplit.length != 2) {
523         continue;
524       }
525       try {
526         confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
527             URLDecoder.decode(familySplit[1], "UTF-8"));
528       } catch (UnsupportedEncodingException e) {
529         // will not happen with UTF-8 encoding
530         throw new AssertionError(e);
531       }
532     }
533     return confValMap;
534   }
535 
536   /**
537    * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
538    * <code>splitPoints</code>. Cleans up the partitions file after job exists.
539    */
540   static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
541       throws IOException {
542     Configuration conf = job.getConfiguration();
543     // create the partitions file
544     FileSystem fs = FileSystem.get(conf);
545     Path partitionsPath = new Path(conf.get("hadoop.tmp.dir"), "partitions_" + UUID.randomUUID());
546     fs.makeQualified(partitionsPath);
547     writePartitions(conf, partitionsPath, splitPoints);
548     fs.deleteOnExit(partitionsPath);
549 
550     // configure job to use it
551     job.setPartitionerClass(TotalOrderPartitioner.class);
552     TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
553   }
554 
555   /**
556    * Serialize column family to compression algorithm map to configuration.
557    * Invoked while configuring the MR job for incremental load.
558    *
559    * @param table to read the properties from
560    * @param conf to persist serialized values into
561    * @throws IOException
562    *           on failure to read column family descriptors
563    */
564   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
565       value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
566   @VisibleForTesting
567   static void configureCompression(
568       HTable table, Configuration conf) throws IOException {
569     StringBuilder compressionConfigValue = new StringBuilder();
570     HTableDescriptor tableDescriptor = table.getTableDescriptor();
571     if(tableDescriptor == null){
572       // could happen with mock table instance
573       return;
574     }
575     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
576     int i = 0;
577     for (HColumnDescriptor familyDescriptor : families) {
578       if (i++ > 0) {
579         compressionConfigValue.append('&');
580       }
581       compressionConfigValue.append(URLEncoder.encode(
582         familyDescriptor.getNameAsString(), "UTF-8"));
583       compressionConfigValue.append('=');
584       compressionConfigValue.append(URLEncoder.encode(
585         familyDescriptor.getCompression().getName(), "UTF-8"));
586     }
587     // Get rid of the last ampersand
588     conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
589   }
590 
591   /**
592    * Serialize column family to block size map to configuration.
593    * Invoked while configuring the MR job for incremental load.
594    *
595    * @param table to read the properties from
596    * @param conf to persist serialized values into
597    * @throws IOException
598    *           on failure to read column family descriptors
599    */
600   @VisibleForTesting
601   static void configureBlockSize(
602       HTable table, Configuration conf) throws IOException {
603     StringBuilder blockSizeConfigValue = new StringBuilder();
604     HTableDescriptor tableDescriptor = table.getTableDescriptor();
605     if (tableDescriptor == null) {
606       // could happen with mock table instance
607       return;
608     }
609     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
610     int i = 0;
611     for (HColumnDescriptor familyDescriptor : families) {
612       if (i++ > 0) {
613         blockSizeConfigValue.append('&');
614       }
615       blockSizeConfigValue.append(URLEncoder.encode(
616           familyDescriptor.getNameAsString(), "UTF-8"));
617       blockSizeConfigValue.append('=');
618       blockSizeConfigValue.append(URLEncoder.encode(
619           String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
620     }
621     // Get rid of the last ampersand
622     conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
623   }
624 
625   /**
626    * Serialize column family to bloom type map to configuration.
627    * Invoked while configuring the MR job for incremental load.
628    *
629    * @param table to read the properties from
630    * @param conf to persist serialized values into
631    * @throws IOException
632    *           on failure to read column family descriptors
633    */
634   @VisibleForTesting
635   static void configureBloomType(
636       HTable table, Configuration conf) throws IOException {
637     HTableDescriptor tableDescriptor = table.getTableDescriptor();
638     if (tableDescriptor == null) {
639       // could happen with mock table instance
640       return;
641     }
642     StringBuilder bloomTypeConfigValue = new StringBuilder();
643     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
644     int i = 0;
645     for (HColumnDescriptor familyDescriptor : families) {
646       if (i++ > 0) {
647         bloomTypeConfigValue.append('&');
648       }
649       bloomTypeConfigValue.append(URLEncoder.encode(
650         familyDescriptor.getNameAsString(), "UTF-8"));
651       bloomTypeConfigValue.append('=');
652       String bloomType = familyDescriptor.getBloomFilterType().toString();
653       if (bloomType == null) {
654         bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
655       }
656       bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
657     }
658     conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
659   }
660 
661   /**
662    * Serialize column family to data block encoding map to configuration.
663    * Invoked while configuring the MR job for incremental load.
664    *
665    * @param table to read the properties from
666    * @param conf to persist serialized values into
667    * @throws IOException
668    *           on failure to read column family descriptors
669    */
670   @VisibleForTesting
671   static void configureDataBlockEncoding(HTable table,
672       Configuration conf) throws IOException {
673     HTableDescriptor tableDescriptor = table.getTableDescriptor();
674     if (tableDescriptor == null) {
675       // could happen with mock table instance
676       return;
677     }
678     StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
679     Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
680     int i = 0;
681     for (HColumnDescriptor familyDescriptor : families) {
682       if (i++ > 0) {
683         dataBlockEncodingConfigValue.append('&');
684       }
685       dataBlockEncodingConfigValue.append(
686           URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
687       dataBlockEncodingConfigValue.append('=');
688       DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
689       if (encoding == null) {
690         encoding = DataBlockEncoding.NONE;
691       }
692       dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
693           "UTF-8"));
694     }
695     conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
696         dataBlockEncodingConfigValue.toString());
697   }
698 }