1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.fail;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.List;
29 import java.util.Random;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.atomic.AtomicLong;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.testclassification.MediumTests;
47 import org.apache.hadoop.hbase.MultithreadedTestUtil;
48 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
49 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.client.Append;
52 import org.apache.hadoop.hbase.client.Delete;
53 import org.apache.hadoop.hbase.client.Durability;
54 import org.apache.hadoop.hbase.client.Get;
55 import org.apache.hadoop.hbase.client.Increment;
56 import org.apache.hadoop.hbase.client.Mutation;
57 import org.apache.hadoop.hbase.client.Put;
58 import org.apache.hadoop.hbase.client.Result;
59 import org.apache.hadoop.hbase.client.RowMutations;
60 import org.apache.hadoop.hbase.client.Scan;
61 import org.apache.hadoop.hbase.filter.BinaryComparator;
62 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
63 import org.apache.hadoop.hbase.io.HeapSize;
64 import org.apache.hadoop.hbase.regionserver.wal.HLog;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.junit.After;
67 import org.junit.Before;
68 import org.junit.Rule;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.junit.rules.TestName;
72
73
74
75
76
77 @Category(MediumTests.class)
78 public class TestAtomicOperation {
79 static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
80 @Rule public TestName name = new TestName();
81
82 HRegion region = null;
83 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
84
85
86 static byte[] tableName;
87 static final byte[] qual1 = Bytes.toBytes("qual1");
88 static final byte[] qual2 = Bytes.toBytes("qual2");
89 static final byte[] qual3 = Bytes.toBytes("qual3");
90 static final byte[] value1 = Bytes.toBytes("value1");
91 static final byte[] value2 = Bytes.toBytes("value2");
92 static final byte [] row = Bytes.toBytes("rowA");
93 static final byte [] row2 = Bytes.toBytes("rowB");
94
95 @Before
96 public void setup() {
97 tableName = Bytes.toBytes(name.getMethodName());
98 }
99
100 @After
101 public void teardown() throws IOException {
102 if (region != null) {
103 region.close();
104 region = null;
105 }
106 }
107
108
109
110
111
112
113
114
115
116
117 @Test
118 public void testAppend() throws IOException {
119 initHRegion(tableName, name.getMethodName(), fam1);
120 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
121 " The Universe, and Everything";
122 String v2 = " is... 42.";
123 Append a = new Append(row);
124 a.setReturnResults(false);
125 a.add(fam1, qual1, Bytes.toBytes(v1));
126 a.add(fam1, qual2, Bytes.toBytes(v2));
127 assertNull(region.append(a));
128 a = new Append(row);
129 a.add(fam1, qual1, Bytes.toBytes(v2));
130 a.add(fam1, qual2, Bytes.toBytes(v1));
131 Result result = region.append(a);
132 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
133 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
134 }
135
136
137
138
139 @Test
140 public void testIncrementMultiThreads() throws IOException {
141
142 LOG.info("Starting test testIncrementMultiThreads");
143
144 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
145
146
147 int numThreads = 100;
148 int incrementsPerThread = 1000;
149 Incrementer[] all = new Incrementer[numThreads];
150 int expectedTotal = 0;
151
152
153 for (int i = 0; i < numThreads; i++) {
154 all[i] = new Incrementer(region, i, i, incrementsPerThread);
155 expectedTotal += (i * incrementsPerThread);
156 }
157
158
159 for (int i = 0; i < numThreads; i++) {
160 all[i].start();
161 }
162
163
164 for (int i = 0; i < numThreads; i++) {
165 try {
166 all[i].join();
167 } catch (InterruptedException e) {
168 }
169 }
170 assertICV(row, fam1, qual1, expectedTotal);
171 assertICV(row, fam1, qual2, expectedTotal*2);
172 assertICV(row, fam2, qual3, expectedTotal*3);
173 LOG.info("testIncrementMultiThreads successfully verified that total is " +
174 expectedTotal);
175 }
176
177
178 private void assertICV(byte [] row,
179 byte [] familiy,
180 byte[] qualifier,
181 long amount) throws IOException {
182
183 Get get = new Get(row);
184 get.addColumn(familiy, qualifier);
185 Result result = region.get(get);
186 assertEquals(1, result.size());
187
188 Cell kv = result.rawCells()[0];
189 long r = Bytes.toLong(CellUtil.cloneValue(kv));
190 assertEquals(amount, r);
191 }
192
193 private void initHRegion (byte [] tableName, String callingMethod,
194 byte[] ... families)
195 throws IOException {
196 initHRegion(tableName, callingMethod, null, families);
197 }
198
199 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
200 byte[] ... families)
201 throws IOException {
202 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
203 int i=0;
204 for(byte [] family : families) {
205 HColumnDescriptor hcd = new HColumnDescriptor(family);
206 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
207 htd.addFamily(hcd);
208 }
209 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
210 region = TEST_UTIL.createLocalHRegion(info, htd);
211 }
212
213
214
215
216 public static class Incrementer extends Thread {
217
218 private final HRegion region;
219 private final int numIncrements;
220 private final int amount;
221
222
223 public Incrementer(HRegion region,
224 int threadNumber, int amount, int numIncrements) {
225 this.region = region;
226 this.numIncrements = numIncrements;
227 this.amount = amount;
228 setDaemon(true);
229 }
230
231 @Override
232 public void run() {
233 for (int i=0; i<numIncrements; i++) {
234 try {
235 Increment inc = new Increment(row);
236 inc.addColumn(fam1, qual1, amount);
237 inc.addColumn(fam1, qual2, amount*2);
238 inc.addColumn(fam2, qual3, amount*3);
239 inc.setDurability(Durability.ASYNC_WAL);
240 region.increment(inc);
241
242
243 Get g = new Get(row);
244 Result result = region.get(g);
245 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2)));
246 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3)));
247 } catch (IOException e) {
248 e.printStackTrace();
249 }
250 }
251 }
252 }
253
254 @Test
255 public void testAppendMultiThreads() throws IOException {
256 LOG.info("Starting test testAppendMultiThreads");
257
258 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
259
260 int numThreads = 100;
261 int opsPerThread = 100;
262 AtomicOperation[] all = new AtomicOperation[numThreads];
263 final byte[] val = new byte[]{1};
264
265 AtomicInteger failures = new AtomicInteger(0);
266
267 for (int i = 0; i < numThreads; i++) {
268 all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
269 @Override
270 public void run() {
271 for (int i=0; i<numOps; i++) {
272 try {
273 Append a = new Append(row);
274 a.add(fam1, qual1, val);
275 a.add(fam1, qual2, val);
276 a.add(fam2, qual3, val);
277 a.setDurability(Durability.ASYNC_WAL);
278 region.append(a);
279
280 Get g = new Get(row);
281 Result result = region.get(g);
282 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
283 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
284 } catch (IOException e) {
285 e.printStackTrace();
286 failures.incrementAndGet();
287 fail();
288 }
289 }
290 }
291 };
292 }
293
294
295 for (int i = 0; i < numThreads; i++) {
296 all[i].start();
297 }
298
299
300 for (int i = 0; i < numThreads; i++) {
301 try {
302 all[i].join();
303 } catch (InterruptedException e) {
304 }
305 }
306 assertEquals(0, failures.get());
307 Get g = new Get(row);
308 Result result = region.get(g);
309 assertEquals(result.getValue(fam1, qual1).length, 10000);
310 assertEquals(result.getValue(fam1, qual2).length, 10000);
311 assertEquals(result.getValue(fam2, qual3).length, 10000);
312 }
313
314
315
316 @Test
317 public void testRowMutationMultiThreads() throws IOException {
318
319 LOG.info("Starting test testRowMutationMultiThreads");
320 initHRegion(tableName, name.getMethodName(), fam1);
321
322
323
324 int numThreads = 10;
325 int opsPerThread = 500;
326 AtomicOperation[] all = new AtomicOperation[numThreads];
327
328 AtomicLong timeStamps = new AtomicLong(0);
329 AtomicInteger failures = new AtomicInteger(0);
330
331 for (int i = 0; i < numThreads; i++) {
332 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
333 @Override
334 public void run() {
335 boolean op = true;
336 for (int i=0; i<numOps; i++) {
337 try {
338
339 if (i%10==0) {
340 synchronized(region) {
341 LOG.debug("flushing");
342 region.flushcache();
343 if (i%100==0) {
344 region.compactStores();
345 }
346 }
347 }
348 long ts = timeStamps.incrementAndGet();
349 RowMutations rm = new RowMutations(row);
350 if (op) {
351 Put p = new Put(row, ts);
352 p.add(fam1, qual1, value1);
353 p.setDurability(Durability.ASYNC_WAL);
354 rm.add(p);
355 Delete d = new Delete(row);
356 d.deleteColumns(fam1, qual2, ts);
357 d.setDurability(Durability.ASYNC_WAL);
358 rm.add(d);
359 } else {
360 Delete d = new Delete(row);
361 d.deleteColumns(fam1, qual1, ts);
362 d.setDurability(Durability.ASYNC_WAL);
363 rm.add(d);
364 Put p = new Put(row, ts);
365 p.add(fam1, qual2, value2);
366 p.setDurability(Durability.ASYNC_WAL);
367 rm.add(p);
368 }
369 region.mutateRow(rm);
370 op ^= true;
371
372 Get g = new Get(row);
373 Result r = region.get(g);
374 if (r.size() != 1) {
375 LOG.debug(r);
376 failures.incrementAndGet();
377 fail();
378 }
379 } catch (IOException e) {
380 e.printStackTrace();
381 failures.incrementAndGet();
382 fail();
383 }
384 }
385 }
386 };
387 }
388
389
390 for (int i = 0; i < numThreads; i++) {
391 all[i].start();
392 }
393
394
395 for (int i = 0; i < numThreads; i++) {
396 try {
397 all[i].join();
398 } catch (InterruptedException e) {
399 }
400 }
401 assertEquals(0, failures.get());
402 }
403
404
405
406
407
408 @Test
409 public void testMultiRowMutationMultiThreads() throws IOException {
410
411 LOG.info("Starting test testMultiRowMutationMultiThreads");
412 initHRegion(tableName, name.getMethodName(), fam1);
413
414
415
416 int numThreads = 10;
417 int opsPerThread = 500;
418 AtomicOperation[] all = new AtomicOperation[numThreads];
419
420 AtomicLong timeStamps = new AtomicLong(0);
421 AtomicInteger failures = new AtomicInteger(0);
422 final List<byte[]> rowsToLock = Arrays.asList(row, row2);
423
424 for (int i = 0; i < numThreads; i++) {
425 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
426 @Override
427 public void run() {
428 boolean op = true;
429 for (int i=0; i<numOps; i++) {
430 try {
431
432 if (i%10==0) {
433 synchronized(region) {
434 LOG.debug("flushing");
435 region.flushcache();
436 if (i%100==0) {
437 region.compactStores();
438 }
439 }
440 }
441 long ts = timeStamps.incrementAndGet();
442 List<Mutation> mrm = new ArrayList<Mutation>();
443 if (op) {
444 Put p = new Put(row2, ts);
445 p.add(fam1, qual1, value1);
446 p.setDurability(Durability.ASYNC_WAL);
447 mrm.add(p);
448 Delete d = new Delete(row);
449 d.deleteColumns(fam1, qual1, ts);
450 d.setDurability(Durability.ASYNC_WAL);
451 mrm.add(d);
452 } else {
453 Delete d = new Delete(row2);
454 d.deleteColumns(fam1, qual1, ts);
455 d.setDurability(Durability.ASYNC_WAL);
456 mrm.add(d);
457 Put p = new Put(row, ts);
458 p.setDurability(Durability.ASYNC_WAL);
459 p.add(fam1, qual1, value2);
460 mrm.add(p);
461 }
462 region.mutateRowsWithLocks(mrm, rowsToLock);
463 op ^= true;
464
465 Scan s = new Scan(row);
466 RegionScanner rs = region.getScanner(s);
467 List<Cell> r = new ArrayList<Cell>();
468 while(rs.next(r));
469 rs.close();
470 if (r.size() != 1) {
471 LOG.debug(r);
472 failures.incrementAndGet();
473 fail();
474 }
475 } catch (IOException e) {
476 e.printStackTrace();
477 failures.incrementAndGet();
478 fail();
479 }
480 }
481 }
482 };
483 }
484
485
486 for (int i = 0; i < numThreads; i++) {
487 all[i].start();
488 }
489
490
491 for (int i = 0; i < numThreads; i++) {
492 try {
493 all[i].join();
494 } catch (InterruptedException e) {
495 }
496 }
497 assertEquals(0, failures.get());
498 }
499
500 public static class AtomicOperation extends Thread {
501 protected final HRegion region;
502 protected final int numOps;
503 protected final AtomicLong timeStamps;
504 protected final AtomicInteger failures;
505 protected final Random r = new Random();
506
507 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
508 AtomicInteger failures) {
509 this.region = region;
510 this.numOps = numOps;
511 this.timeStamps = timeStamps;
512 this.failures = failures;
513 }
514 }
515
516 private static CountDownLatch latch = new CountDownLatch(1);
517 private enum TestStep {
518 INIT,
519 PUT_STARTED,
520 PUT_COMPLETED,
521 CHECKANDPUT_STARTED,
522 CHECKANDPUT_COMPLETED
523
524 }
525 private static volatile TestStep testStep = TestStep.INIT;
526 private final String family = "f1";
527
528
529
530
531
532
533
534 @Test
535 public void testPutAndCheckAndPutInParallel() throws Exception {
536
537 final String tableName = "testPutAndCheckAndPut";
538 Configuration conf = TEST_UTIL.getConfiguration();
539 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
540 final MockHRegion region = (MockHRegion) TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
541 null, null, tableName, conf, false, Durability.SYNC_WAL, null, Bytes.toBytes(family));
542
543 Put[] puts = new Put[1];
544 Put put = new Put(Bytes.toBytes("r1"));
545 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
546 puts[0] = put;
547
548 region.batchMutate(puts);
549 MultithreadedTestUtil.TestContext ctx =
550 new MultithreadedTestUtil.TestContext(conf);
551 ctx.addThread(new PutThread(ctx, region));
552 ctx.addThread(new CheckAndPutThread(ctx, region));
553 ctx.startThreads();
554 while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
555 Thread.sleep(100);
556 }
557 ctx.stop();
558 Scan s = new Scan();
559 RegionScanner scanner = region.getScanner(s);
560 List<Cell> results = new ArrayList<Cell>();
561 scanner.next(results, 2);
562 for (Cell keyValue : results) {
563 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
564 }
565
566 }
567
568 private class PutThread extends TestThread {
569 private MockHRegion region;
570 PutThread(TestContext ctx, MockHRegion region) {
571 super(ctx);
572 this.region = region;
573 }
574
575 public void doWork() throws Exception {
576 Put[] puts = new Put[1];
577 Put put = new Put(Bytes.toBytes("r1"));
578 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
579 puts[0] = put;
580 testStep = TestStep.PUT_STARTED;
581 region.batchMutate(puts);
582 }
583 }
584
585 private class CheckAndPutThread extends TestThread {
586 private MockHRegion region;
587 CheckAndPutThread(TestContext ctx, MockHRegion region) {
588 super(ctx);
589 this.region = region;
590 }
591
592 public void doWork() throws Exception {
593 Put[] puts = new Put[1];
594 Put put = new Put(Bytes.toBytes("r1"));
595 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
596 puts[0] = put;
597 while (testStep != TestStep.PUT_COMPLETED) {
598 Thread.sleep(100);
599 }
600 testStep = TestStep.CHECKANDPUT_STARTED;
601 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
602 CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
603 testStep = TestStep.CHECKANDPUT_COMPLETED;
604 }
605 }
606
607 public static class MockHRegion extends HRegion {
608
609 public MockHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
610 final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) {
611 super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
612 }
613
614 @Override
615 public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException {
616 if (testStep == TestStep.CHECKANDPUT_STARTED) {
617 latch.countDown();
618 }
619 return new WrappedRowLock(super.getRowLockInternal(row, waitForLock));
620 }
621
622 public class WrappedRowLock extends RowLock {
623
624 private WrappedRowLock(RowLock rowLock) {
625 super(rowLock.context);
626 }
627
628 @Override
629 public void release() {
630 if (testStep == TestStep.INIT) {
631 super.release();
632 return;
633 }
634
635 if (testStep == TestStep.PUT_STARTED) {
636 try {
637 testStep = TestStep.PUT_COMPLETED;
638 super.release();
639
640
641
642
643
644
645
646
647
648
649 latch.await();
650 Thread.sleep(1000);
651 } catch (InterruptedException e) {
652 Thread.currentThread().interrupt();
653 }
654 }
655 else if (testStep == TestStep.CHECKANDPUT_STARTED) {
656 super.release();
657 }
658 }
659 }
660 }
661 }