1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.mapreduce; 20 21 import java.io.IOException; 22 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.apache.hadoop.hbase.classification.InterfaceAudience; 26 import org.apache.hadoop.hbase.classification.InterfaceStability; 27 import org.apache.hadoop.conf.Configurable; 28 import org.apache.hadoop.conf.Configuration; 29 import org.apache.hadoop.hbase.HBaseConfiguration; 30 import org.apache.hadoop.hbase.client.HTable; 31 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 32 import org.apache.hadoop.hbase.util.Bytes; 33 import org.apache.hadoop.mapreduce.Partitioner; 34 35 /** 36 * This is used to partition the output keys into groups of keys. 37 * Keys are grouped according to the regions that currently exist 38 * so that each reducer fills a single region so load is distributed. 39 * 40 * <p>This class is not suitable as partitioner creating hfiles 41 * for incremental bulk loads as region spread will likely change between time of 42 * hfile creation and load time. See {@link LoadIncrementalHFiles} 43 * and <a href="http://hbase.apache.org/docs/current/bulk-loads.html">Bulk Load</a>. 44 * 45 * @param <KEY> The type of the key. 46 * @param <VALUE> The type of the value. 47 */ 48 @InterfaceAudience.Public 49 @InterfaceStability.Stable 50 public class HRegionPartitioner<KEY, VALUE> 51 extends Partitioner<ImmutableBytesWritable, VALUE> 52 implements Configurable { 53 54 private final Log LOG = LogFactory.getLog(TableInputFormat.class); 55 private Configuration conf = null; 56 private HTable table; 57 private byte[][] startKeys; 58 59 /** 60 * Gets the partition number for a given key (hence record) given the total 61 * number of partitions i.e. number of reduce-tasks for the job. 62 * 63 * <p>Typically a hash function on a all or a subset of the key.</p> 64 * 65 * @param key The key to be partitioned. 66 * @param value The entry value. 67 * @param numPartitions The total number of partitions. 68 * @return The partition number for the <code>key</code>. 69 * @see org.apache.hadoop.mapreduce.Partitioner#getPartition( 70 * java.lang.Object, java.lang.Object, int) 71 */ 72 @Override 73 public int getPartition(ImmutableBytesWritable key, 74 VALUE value, int numPartitions) { 75 byte[] region = null; 76 // Only one region return 0 77 if (this.startKeys.length == 1){ 78 return 0; 79 } 80 try { 81 // Not sure if this is cached after a split so we could have problems 82 // here if a region splits while mapping 83 region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey(); 84 } catch (IOException e) { 85 LOG.error(e); 86 } 87 for (int i = 0; i < this.startKeys.length; i++){ 88 if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ 89 if (i >= numPartitions-1){ 90 // cover if we have less reduces then regions. 91 return (Integer.toString(i).hashCode() 92 & Integer.MAX_VALUE) % numPartitions; 93 } 94 return i; 95 } 96 } 97 // if above fails to find start key that match we need to return something 98 return 0; 99 } 100 101 /** 102 * Returns the current configuration. 103 * 104 * @return The current configuration. 105 * @see org.apache.hadoop.conf.Configurable#getConf() 106 */ 107 @Override 108 public Configuration getConf() { 109 return conf; 110 } 111 112 /** 113 * Sets the configuration. This is used to determine the start keys for the 114 * given table. 115 * 116 * @param configuration The configuration to set. 117 * @see org.apache.hadoop.conf.Configurable#setConf( 118 * org.apache.hadoop.conf.Configuration) 119 */ 120 @Override 121 public void setConf(Configuration configuration) { 122 this.conf = HBaseConfiguration.create(configuration); 123 try { 124 this.table = new HTable(this.conf, 125 configuration.get(TableOutputFormat.OUTPUT_TABLE)); 126 } catch (IOException e) { 127 LOG.error(e); 128 } 129 try { 130 this.startKeys = this.table.getStartKeys(); 131 } catch (IOException e) { 132 LOG.error(e); 133 } 134 } 135 }