1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23
24 import java.io.IOException;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.NavigableMap;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Result;
38 import org.apache.hadoop.hbase.client.ResultScanner;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45
46
47
48
49
50
51
52 public abstract class TestTableMapReduceBase {
53
54 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
55 protected static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
56 protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
57 protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
58
59 protected static final byte[][] columns = new byte[][] {
60 INPUT_FAMILY,
61 OUTPUT_FAMILY
62 };
63
64
65
66
67 protected abstract Log getLog();
68
69
70
71
72 protected abstract void runTestOnTable(HTable table) throws IOException;
73
74 @BeforeClass
75 public static void beforeClass() throws Exception {
76 UTIL.startMiniCluster();
77 HTable table =
78 UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY });
79 UTIL.createMultiRegions(table, INPUT_FAMILY);
80 UTIL.loadTable(table, INPUT_FAMILY, false);
81 UTIL.startMiniMapReduceCluster();
82 }
83
84 @AfterClass
85 public static void afterClass() throws Exception {
86 UTIL.shutdownMiniMapReduceCluster();
87 UTIL.shutdownMiniCluster();
88 }
89
90
91
92
93
94 @Test
95 public void testMultiRegionTable() throws IOException {
96 runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME));
97 }
98
99 @Test
100 public void testCombiner() throws IOException {
101 Configuration conf = new Configuration(UTIL.getConfiguration());
102
103 conf.setInt("min.num.spills.for.combine", 1);
104 runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
105 }
106
107
108
109
110 protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
111 if (value.size() != 1) {
112 throw new IOException("There should only be one input column");
113 }
114 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
115 cf = value.getMap();
116 if(!cf.containsKey(INPUT_FAMILY)) {
117 throw new IOException("Wrong input columns. Missing: '" +
118 Bytes.toString(INPUT_FAMILY) + "'.");
119 }
120
121
122
123 String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
124 StringBuilder newValue = new StringBuilder(originalValue);
125 newValue.reverse();
126
127
128
129 Put outval = new Put(key.get());
130 outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
131 return outval;
132 }
133
134 protected void verify(String tableName) throws IOException {
135 HTable table = new HTable(UTIL.getConfiguration(), tableName);
136 boolean verified = false;
137 long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
138 int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
139 for (int i = 0; i < numRetries; i++) {
140 try {
141 getLog().info("Verification attempt #" + i);
142 verifyAttempt(table);
143 verified = true;
144 break;
145 } catch (NullPointerException e) {
146
147
148 getLog().debug("Verification attempt failed: " + e.getMessage());
149 }
150 try {
151 Thread.sleep(pause);
152 } catch (InterruptedException e) {
153
154 }
155 }
156 assertTrue(verified);
157 }
158
159
160
161
162
163
164
165
166 private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
167 Scan scan = new Scan();
168 TableInputFormat.addColumns(scan, columns);
169 ResultScanner scanner = table.getScanner(scan);
170 try {
171 Iterator<Result> itr = scanner.iterator();
172 assertTrue(itr.hasNext());
173 while(itr.hasNext()) {
174 Result r = itr.next();
175 if (getLog().isDebugEnabled()) {
176 if (r.size() > 2 ) {
177 throw new IOException("Too many results, expected 2 got " +
178 r.size());
179 }
180 }
181 byte[] firstValue = null;
182 byte[] secondValue = null;
183 int count = 0;
184 for(Cell kv : r.listCells()) {
185 if (count == 0) {
186 firstValue = CellUtil.cloneValue(kv);
187 }
188 if (count == 1) {
189 secondValue = CellUtil.cloneValue(kv);
190 }
191 count++;
192 if (count == 2) {
193 break;
194 }
195 }
196
197
198 if (firstValue == null) {
199 throw new NullPointerException(Bytes.toString(r.getRow()) +
200 ": first value is null");
201 }
202 String first = Bytes.toString(firstValue);
203
204 if (secondValue == null) {
205 throw new NullPointerException(Bytes.toString(r.getRow()) +
206 ": second value is null");
207 }
208 byte[] secondReversed = new byte[secondValue.length];
209 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
210 secondReversed[i] = secondValue[j];
211 }
212 String second = Bytes.toString(secondReversed);
213
214 if (first.compareTo(second) != 0) {
215 if (getLog().isDebugEnabled()) {
216 getLog().debug("second key is not the reverse of first. row=" +
217 Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
218 ", second value=" + second);
219 }
220 fail();
221 }
222 }
223 } finally {
224 scanner.close();
225 }
226 }
227 }