1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.Random;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
30 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
31 import org.apache.hadoop.hbase.client.Get;
32 import org.apache.hadoop.hbase.client.HBaseAdmin;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.ResultScanner;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
39 import org.apache.hadoop.hbase.testclassification.MediumTests;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.util.StringUtils;
42 import org.apache.hadoop.util.Tool;
43 import org.apache.hadoop.util.ToolRunner;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46
47 import com.google.common.collect.Lists;
48
49
50
51
52
53
54
55
56 @Category(MediumTests.class)
57 public class TestAcidGuarantees implements Tool {
58 protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
59 public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
60 public static final byte [] FAMILY_A = Bytes.toBytes("A");
61 public static final byte [] FAMILY_B = Bytes.toBytes("B");
62 public static final byte [] FAMILY_C = Bytes.toBytes("C");
63 public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
64
65 public static final byte[][] FAMILIES = new byte[][] {
66 FAMILY_A, FAMILY_B, FAMILY_C };
67
68 private HBaseTestingUtility util;
69
70 public static int NUM_COLS_TO_CHECK = 50;
71
72
73 private Configuration conf;
74
75 private void createTableIfMissing()
76 throws IOException {
77 try {
78 util.createTable(TABLE_NAME, FAMILIES);
79 } catch (TableExistsException tee) {
80 }
81 }
82
83 public TestAcidGuarantees() {
84
85 Configuration conf = HBaseConfiguration.create();
86 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
87
88 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
89 ConstantSizeRegionSplitPolicy.class.getName());
90 util = new HBaseTestingUtility(conf);
91 }
92
93
94
95
96 public static class AtomicityWriter extends RepeatingTestThread {
97 Random rand = new Random();
98 byte data[] = new byte[10];
99 byte targetRows[][];
100 byte targetFamilies[][];
101 HTable table;
102 AtomicLong numWritten = new AtomicLong();
103
104 public AtomicityWriter(TestContext ctx, byte targetRows[][],
105 byte targetFamilies[][]) throws IOException {
106 super(ctx);
107 this.targetRows = targetRows;
108 this.targetFamilies = targetFamilies;
109 table = new HTable(ctx.getConf(), TABLE_NAME);
110 }
111 public void doAnAction() throws Exception {
112
113 byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
114 Put p = new Put(targetRow);
115 rand.nextBytes(data);
116
117 for (byte[] family : targetFamilies) {
118 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
119 byte qualifier[] = Bytes.toBytes("col" + i);
120 p.add(family, qualifier, data);
121 }
122 }
123 table.put(p);
124 numWritten.getAndIncrement();
125 }
126 }
127
128
129
130
131
132 public static class AtomicGetReader extends RepeatingTestThread {
133 byte targetRow[];
134 byte targetFamilies[][];
135 HTable table;
136 int numVerified = 0;
137 AtomicLong numRead = new AtomicLong();
138
139 public AtomicGetReader(TestContext ctx, byte targetRow[],
140 byte targetFamilies[][]) throws IOException {
141 super(ctx);
142 this.targetRow = targetRow;
143 this.targetFamilies = targetFamilies;
144 table = new HTable(ctx.getConf(), TABLE_NAME);
145 }
146
147 public void doAnAction() throws Exception {
148 Get g = new Get(targetRow);
149 Result res = table.get(g);
150 byte[] gotValue = null;
151 if (res.getRow() == null) {
152
153
154
155 return;
156 }
157
158 for (byte[] family : targetFamilies) {
159 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
160 byte qualifier[] = Bytes.toBytes("col" + i);
161 byte thisValue[] = res.getValue(family, qualifier);
162 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
163 gotFailure(gotValue, res);
164 }
165 numVerified++;
166 gotValue = thisValue;
167 }
168 }
169 numRead.getAndIncrement();
170 }
171
172 private void gotFailure(byte[] expected, Result res) {
173 StringBuilder msg = new StringBuilder();
174 msg.append("Failed after ").append(numVerified).append("!");
175 msg.append("Expected=").append(Bytes.toStringBinary(expected));
176 msg.append("Got:\n");
177 for (Cell kv : res.listCells()) {
178 msg.append(kv.toString());
179 msg.append(" val= ");
180 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
181 msg.append("\n");
182 }
183 throw new RuntimeException(msg.toString());
184 }
185 }
186
187
188
189
190
191 public static class AtomicScanReader extends RepeatingTestThread {
192 byte targetFamilies[][];
193 HTable table;
194 AtomicLong numScans = new AtomicLong();
195 AtomicLong numRowsScanned = new AtomicLong();
196
197 public AtomicScanReader(TestContext ctx,
198 byte targetFamilies[][]) throws IOException {
199 super(ctx);
200 this.targetFamilies = targetFamilies;
201 table = new HTable(ctx.getConf(), TABLE_NAME);
202 }
203
204 public void doAnAction() throws Exception {
205 Scan s = new Scan();
206 for (byte[] family : targetFamilies) {
207 s.addFamily(family);
208 }
209 ResultScanner scanner = table.getScanner(s);
210
211 for (Result res : scanner) {
212 byte[] gotValue = null;
213
214 for (byte[] family : targetFamilies) {
215 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
216 byte qualifier[] = Bytes.toBytes("col" + i);
217 byte thisValue[] = res.getValue(family, qualifier);
218 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
219 gotFailure(gotValue, res);
220 }
221 gotValue = thisValue;
222 }
223 }
224 numRowsScanned.getAndIncrement();
225 }
226 numScans.getAndIncrement();
227 }
228
229 private void gotFailure(byte[] expected, Result res) {
230 StringBuilder msg = new StringBuilder();
231 msg.append("Failed after ").append(numRowsScanned).append("!");
232 msg.append("Expected=").append(Bytes.toStringBinary(expected));
233 msg.append("Got:\n");
234 for (Cell kv : res.listCells()) {
235 msg.append(kv.toString());
236 msg.append(" val= ");
237 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
238 msg.append("\n");
239 }
240 throw new RuntimeException(msg.toString());
241 }
242 }
243
244 public void runTestAtomicity(long millisToRun,
245 int numWriters,
246 int numGetters,
247 int numScanners,
248 int numUniqueRows) throws Exception {
249 runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
250 }
251
252 public void runTestAtomicity(long millisToRun,
253 int numWriters,
254 int numGetters,
255 int numScanners,
256 int numUniqueRows,
257 final boolean systemTest) throws Exception {
258 createTableIfMissing();
259 TestContext ctx = new TestContext(util.getConfiguration());
260
261 byte rows[][] = new byte[numUniqueRows][];
262 for (int i = 0; i < numUniqueRows; i++) {
263 rows[i] = Bytes.toBytes("test_row_" + i);
264 }
265
266 List<AtomicityWriter> writers = Lists.newArrayList();
267 for (int i = 0; i < numWriters; i++) {
268 AtomicityWriter writer = new AtomicityWriter(
269 ctx, rows, FAMILIES);
270 writers.add(writer);
271 ctx.addThread(writer);
272 }
273
274 ctx.addThread(new RepeatingTestThread(ctx) {
275 HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
276 public void doAnAction() throws Exception {
277 try {
278 admin.flush(TABLE_NAME);
279 } catch(IOException ioe) {
280 LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
281 }
282
283
284
285
286
287
288
289
290 if (systemTest) Thread.sleep(60000);
291 }
292 });
293
294 List<AtomicGetReader> getters = Lists.newArrayList();
295 for (int i = 0; i < numGetters; i++) {
296 AtomicGetReader getter = new AtomicGetReader(
297 ctx, rows[i % numUniqueRows], FAMILIES);
298 getters.add(getter);
299 ctx.addThread(getter);
300 }
301
302 List<AtomicScanReader> scanners = Lists.newArrayList();
303 for (int i = 0; i < numScanners; i++) {
304 AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
305 scanners.add(scanner);
306 ctx.addThread(scanner);
307 }
308
309 ctx.startThreads();
310 ctx.waitFor(millisToRun);
311 ctx.stop();
312
313 LOG.info("Finished test. Writers:");
314 for (AtomicityWriter writer : writers) {
315 LOG.info(" wrote " + writer.numWritten.get());
316 }
317 LOG.info("Readers:");
318 for (AtomicGetReader reader : getters) {
319 LOG.info(" read " + reader.numRead.get());
320 }
321 LOG.info("Scanners:");
322 for (AtomicScanReader scanner : scanners) {
323 LOG.info(" scanned " + scanner.numScans.get());
324 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
325 }
326 }
327
328 @Test
329 public void testGetAtomicity() throws Exception {
330 util.startMiniCluster(1);
331 try {
332 runTestAtomicity(20000, 5, 5, 0, 3);
333 } finally {
334 util.shutdownMiniCluster();
335 }
336 }
337
338 @Test
339 public void testScanAtomicity() throws Exception {
340 util.startMiniCluster(1);
341 try {
342 runTestAtomicity(20000, 5, 0, 5, 3);
343 } finally {
344 util.shutdownMiniCluster();
345 }
346 }
347
348 @Test
349 public void testMixedAtomicity() throws Exception {
350 util.startMiniCluster(1);
351 try {
352 runTestAtomicity(20000, 5, 2, 2, 3);
353 } finally {
354 util.shutdownMiniCluster();
355 }
356 }
357
358
359
360
361 @Override
362 public Configuration getConf() {
363 return conf;
364 }
365
366 @Override
367 public void setConf(Configuration c) {
368 this.conf = c;
369 this.util = new HBaseTestingUtility(c);
370 }
371
372 @Override
373 public int run(String[] arg0) throws Exception {
374 Configuration c = getConf();
375 int millis = c.getInt("millis", 5000);
376 int numWriters = c.getInt("numWriters", 50);
377 int numGetters = c.getInt("numGetters", 2);
378 int numScanners = c.getInt("numScanners", 2);
379 int numUniqueRows = c.getInt("numUniqueRows", 3);
380 runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true);
381 return 0;
382 }
383
384 public static void main(String args[]) throws Exception {
385 Configuration c = HBaseConfiguration.create();
386 int status;
387 try {
388 TestAcidGuarantees test = new TestAcidGuarantees();
389 status = ToolRunner.run(c, test, args);
390 } catch (Exception e) {
391 LOG.error("Exiting due to error", e);
392 status = -1;
393 }
394 System.exit(status);
395 }
396
397
398 }
399