1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.Arrays;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HRegionLocation;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Result;
38 import org.apache.hadoop.hbase.client.Scan;
39 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.io.NullWritable;
42 import org.apache.hadoop.mapreduce.InputSplit;
43 import org.apache.hadoop.mapreduce.Job;
44 import org.apache.hadoop.mapreduce.Reducer;
45 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
46 import org.junit.AfterClass;
47 import org.junit.Assert;
48 import org.junit.BeforeClass;
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public abstract class TestTableInputFormatScanBase {
63
64 static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
65 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
66
67 static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
68 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
69 static final String KEY_STARTROW = "startRow";
70 static final String KEY_LASTROW = "stpRow";
71
72 private static HTable table = null;
73
74 @BeforeClass
75 public static void setUpBeforeClass() throws Exception {
76
77
78 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
79
80
81 TEST_UTIL.enableDebug(TableInputFormat.class);
82 TEST_UTIL.enableDebug(TableInputFormatBase.class);
83
84 TEST_UTIL.startMiniCluster(3);
85
86 table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
87 TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
88 TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
89
90 TEST_UTIL.startMiniMapReduceCluster();
91 }
92
93 @AfterClass
94 public static void tearDownAfterClass() throws Exception {
95 TEST_UTIL.shutdownMiniMapReduceCluster();
96 TEST_UTIL.shutdownMiniCluster();
97 }
98
99
100
101
102 public static class ScanMapper
103 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
104
105
106
107
108
109
110
111
112
113 @Override
114 public void map(ImmutableBytesWritable key, Result value,
115 Context context)
116 throws IOException, InterruptedException {
117 if (value.size() != 1) {
118 throw new IOException("There should only be one input column");
119 }
120 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
121 cf = value.getMap();
122 if(!cf.containsKey(INPUT_FAMILY)) {
123 throw new IOException("Wrong input columns. Missing: '" +
124 Bytes.toString(INPUT_FAMILY) + "'.");
125 }
126 String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
127 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
128 ", value -> " + val);
129 context.write(key, key);
130 }
131
132 }
133
134
135
136
137 public static class ScanReducer
138 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
139 NullWritable, NullWritable> {
140
141 private String first = null;
142 private String last = null;
143
144 protected void reduce(ImmutableBytesWritable key,
145 Iterable<ImmutableBytesWritable> values, Context context)
146 throws IOException ,InterruptedException {
147 int count = 0;
148 for (ImmutableBytesWritable value : values) {
149 String val = Bytes.toStringBinary(value.get());
150 LOG.info("reduce: key[" + count + "] -> " +
151 Bytes.toStringBinary(key.get()) + ", value -> " + val);
152 if (first == null) first = val;
153 last = val;
154 count++;
155 }
156 }
157
158 protected void cleanup(Context context)
159 throws IOException, InterruptedException {
160 Configuration c = context.getConfiguration();
161 String startRow = c.get(KEY_STARTROW);
162 String lastRow = c.get(KEY_LASTROW);
163 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
164 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
165 if (startRow != null && startRow.length() > 0) {
166 assertEquals(startRow, first);
167 }
168 if (lastRow != null && lastRow.length() > 0) {
169 assertEquals(lastRow, last);
170 }
171 }
172
173 }
174
175
176
177
178
179
180
181
182 protected void testScanFromConfiguration(String start, String stop, String last)
183 throws IOException, InterruptedException, ClassNotFoundException {
184 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") +
185 "To" + (stop != null ? stop.toUpperCase() : "Empty");
186 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
187 c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME));
188 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY));
189 c.set(KEY_STARTROW, start != null ? start : "");
190 c.set(KEY_LASTROW, last != null ? last : "");
191
192 if (start != null) {
193 c.set(TableInputFormat.SCAN_ROW_START, start);
194 }
195
196 if (stop != null) {
197 c.set(TableInputFormat.SCAN_ROW_STOP, stop);
198 }
199
200 Job job = new Job(c, jobName);
201 job.setMapperClass(ScanMapper.class);
202 job.setReducerClass(ScanReducer.class);
203 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
204 job.setMapOutputValueClass(ImmutableBytesWritable.class);
205 job.setInputFormatClass(TableInputFormat.class);
206 job.setNumReduceTasks(1);
207 FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
208 TableMapReduceUtil.addDependencyJars(job);
209 assertTrue(job.waitForCompletion(true));
210 }
211
212
213
214
215
216
217
218
219 protected void testScan(String start, String stop, String last)
220 throws IOException, InterruptedException, ClassNotFoundException {
221 String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
222 "To" + (stop != null ? stop.toUpperCase() : "Empty");
223 LOG.info("Before map/reduce startup - job " + jobName);
224 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
225 Scan scan = new Scan();
226 scan.addFamily(INPUT_FAMILY);
227 if (start != null) {
228 scan.setStartRow(Bytes.toBytes(start));
229 }
230 c.set(KEY_STARTROW, start != null ? start : "");
231 if (stop != null) {
232 scan.setStopRow(Bytes.toBytes(stop));
233 }
234 c.set(KEY_LASTROW, last != null ? last : "");
235 LOG.info("scan before: " + scan);
236 Job job = new Job(c, jobName);
237 TableMapReduceUtil.initTableMapperJob(
238 Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
239 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
240 job.setReducerClass(ScanReducer.class);
241 job.setNumReduceTasks(1);
242 FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
243 LOG.info("Started " + job.getJobName());
244 assertTrue(job.waitForCompletion(true));
245 LOG.info("After map/reduce completion - job " + jobName);
246 }
247
248
249
250
251
252
253
254
255
256 public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
257 InterruptedException,
258 ClassNotFoundException {
259 String jobName = "TestJobForNumOfSplits";
260 LOG.info("Before map/reduce startup - job " + jobName);
261 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
262 Scan scan = new Scan();
263 scan.addFamily(INPUT_FAMILY);
264 c.set("hbase.mapreduce.input.autobalance", "true");
265 c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
266 c.set(KEY_STARTROW, "");
267 c.set(KEY_LASTROW, "");
268 Job job = new Job(c, jobName);
269 TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
270 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
271 TableInputFormat tif = new TableInputFormat();
272 tif.setConf(job.getConfiguration());
273 Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
274 List<InputSplit> splits = tif.getSplits(job);
275 Assert.assertEquals(expectedNumOfSplits, splits.size());
276 }
277
278
279
280
281 public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) {
282 byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
283 Assert.assertArrayEquals(splitKey, result);
284 }
285 }
286