1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.util.HashMap;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.ClusterStatus;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HRegionInfo;
37 import org.apache.hadoop.hbase.HTableDescriptor;
38 import org.apache.hadoop.hbase.ServerLoad;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.client.Delete;
42 import org.apache.hadoop.hbase.client.Get;
43 import org.apache.hadoop.hbase.client.HBaseAdmin;
44 import org.apache.hadoop.hbase.client.HTable;
45 import org.apache.hadoop.hbase.client.Put;
46 import org.apache.hadoop.hbase.client.Result;
47 import org.apache.hadoop.hbase.client.ResultScanner;
48 import org.apache.hadoop.hbase.client.Scan;
49 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
50 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
51 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
52 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
53 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54 import org.apache.hadoop.hbase.replication.regionserver.Replication;
55 import org.apache.hadoop.hbase.testclassification.LargeTests;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58 import org.apache.hadoop.hbase.util.JVMClusterUtil;
59 import org.apache.hadoop.mapreduce.Job;
60 import org.junit.Before;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63
64 @Category(LargeTests.class)
65 public class TestReplicationSmallTests extends TestReplicationBase {
66
67 private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
68
69
70
71
72 @Before
73 public void setUp() throws Exception {
74 htable1.setAutoFlush(true, true);
75
76
77 for ( JVMClusterUtil.RegionServerThread r :
78 utility1.getHBaseCluster().getRegionServerThreads()) {
79 r.getRegionServer().getWAL().rollWriter();
80 }
81 utility1.truncateTable(tableName);
82
83
84
85
86
87 Scan scan = new Scan();
88 int lastCount = 0;
89 for (int i = 0; i < NB_RETRIES; i++) {
90 if (i==NB_RETRIES-1) {
91 fail("Waited too much time for truncate");
92 }
93 ResultScanner scanner = htable2.getScanner(scan);
94 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
95 scanner.close();
96 if (res.length != 0) {
97 if (res.length < lastCount) {
98 i--;
99 }
100 lastCount = res.length;
101 LOG.info("Still got " + res.length + " rows");
102 Thread.sleep(SLEEP_TIME);
103 } else {
104 break;
105 }
106 }
107 }
108
109
110
111
112
113
114 @Test(timeout=300000)
115 public void testDeleteTypes() throws Exception {
116 LOG.info("testDeleteTypes");
117 final byte[] v1 = Bytes.toBytes("v1");
118 final byte[] v2 = Bytes.toBytes("v2");
119 final byte[] v3 = Bytes.toBytes("v3");
120 htable1 = new HTable(conf1, tableName);
121
122 long t = EnvironmentEdgeManager.currentTimeMillis();
123
124 Put put = new Put(row);
125 put.add(famName, row, t, v1);
126 htable1.put(put);
127
128 put = new Put(row);
129 put.add(famName, row, t+1, v2);
130 htable1.put(put);
131
132 put = new Put(row);
133 put.add(famName, row, t+2, v3);
134 htable1.put(put);
135
136 Get get = new Get(row);
137 get.setMaxVersions();
138 for (int i = 0; i < NB_RETRIES; i++) {
139 if (i==NB_RETRIES-1) {
140 fail("Waited too much time for put replication");
141 }
142 Result res = htable2.get(get);
143 if (res.size() < 3) {
144 LOG.info("Rows not available");
145 Thread.sleep(SLEEP_TIME);
146 } else {
147 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
148 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
149 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
150 break;
151 }
152 }
153
154 Delete d = new Delete(row);
155 d.deleteColumn(famName, row, t);
156 htable1.delete(d);
157
158 get = new Get(row);
159 get.setMaxVersions();
160 for (int i = 0; i < NB_RETRIES; i++) {
161 if (i==NB_RETRIES-1) {
162 fail("Waited too much time for put replication");
163 }
164 Result res = htable2.get(get);
165 if (res.size() > 2) {
166 LOG.info("Version not deleted");
167 Thread.sleep(SLEEP_TIME);
168 } else {
169 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
170 assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
171 break;
172 }
173 }
174
175
176 d = new Delete(row);
177 d.deleteColumns(famName, row, t+2);
178 htable1.delete(d);
179
180
181
182 get = new Get(row);
183 for (int i = 0; i < NB_RETRIES; i++) {
184 if (i==NB_RETRIES-1) {
185 fail("Waited too much time for del replication");
186 }
187 Result res = htable2.get(get);
188 if (res.size() >= 1) {
189 LOG.info("Rows not deleted");
190 Thread.sleep(SLEEP_TIME);
191 } else {
192 break;
193 }
194 }
195 }
196
197
198
199
200
201 @Test(timeout=300000)
202 public void testSimplePutDelete() throws Exception {
203 LOG.info("testSimplePutDelete");
204 Put put = new Put(row);
205 put.add(famName, row, row);
206
207 htable1 = new HTable(conf1, tableName);
208 htable1.put(put);
209
210 Get get = new Get(row);
211 for (int i = 0; i < NB_RETRIES; i++) {
212 if (i==NB_RETRIES-1) {
213 fail("Waited too much time for put replication");
214 }
215 Result res = htable2.get(get);
216 if (res.size() == 0) {
217 LOG.info("Row not available");
218 Thread.sleep(SLEEP_TIME);
219 } else {
220 assertArrayEquals(res.value(), row);
221 break;
222 }
223 }
224
225 Delete del = new Delete(row);
226 htable1.delete(del);
227
228 get = new Get(row);
229 for (int i = 0; i < NB_RETRIES; i++) {
230 if (i==NB_RETRIES-1) {
231 fail("Waited too much time for del replication");
232 }
233 Result res = htable2.get(get);
234 if (res.size() >= 1) {
235 LOG.info("Row not deleted");
236 Thread.sleep(SLEEP_TIME);
237 } else {
238 break;
239 }
240 }
241 }
242
243
244
245
246
247 @Test(timeout=300000)
248 public void testSmallBatch() throws Exception {
249 LOG.info("testSmallBatch");
250 Put put;
251
252 htable1.setAutoFlush(false, true);
253 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
254 put = new Put(Bytes.toBytes(i));
255 put.add(famName, row, row);
256 htable1.put(put);
257 }
258 htable1.flushCommits();
259
260 Scan scan = new Scan();
261
262 ResultScanner scanner1 = htable1.getScanner(scan);
263 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
264 scanner1.close();
265 assertEquals(NB_ROWS_IN_BATCH, res1.length);
266
267 for (int i = 0; i < NB_RETRIES; i++) {
268 scan = new Scan();
269 if (i==NB_RETRIES-1) {
270 fail("Waited too much time for normal batch replication");
271 }
272 ResultScanner scanner = htable2.getScanner(scan);
273 Result[] res = scanner.next(NB_ROWS_IN_BATCH);
274 scanner.close();
275 if (res.length != NB_ROWS_IN_BATCH) {
276 LOG.info("Only got " + res.length + " rows");
277 Thread.sleep(SLEEP_TIME);
278 } else {
279 break;
280 }
281 }
282 }
283
284
285
286
287
288
289
290 @Test(timeout = 300000)
291 public void testDisableEnable() throws Exception {
292
293
294 admin.disablePeer("2");
295
296 byte[] rowkey = Bytes.toBytes("disable enable");
297 Put put = new Put(rowkey);
298 put.add(famName, row, row);
299 htable1.put(put);
300
301 Get get = new Get(rowkey);
302 for (int i = 0; i < NB_RETRIES; i++) {
303 Result res = htable2.get(get);
304 if (res.size() >= 1) {
305 fail("Replication wasn't disabled");
306 } else {
307 LOG.info("Row not replicated, let's wait a bit more...");
308 Thread.sleep(SLEEP_TIME);
309 }
310 }
311
312
313 admin.enablePeer("2");
314
315 for (int i = 0; i < NB_RETRIES; i++) {
316 Result res = htable2.get(get);
317 if (res.size() == 0) {
318 LOG.info("Row not available");
319 Thread.sleep(SLEEP_TIME);
320 } else {
321 assertArrayEquals(res.value(), row);
322 return;
323 }
324 }
325 fail("Waited too much time for put replication");
326 }
327
328
329
330
331
332
333
334 @Test(timeout=300000)
335 public void testAddAndRemoveClusters() throws Exception {
336 LOG.info("testAddAndRemoveClusters");
337 admin.removePeer("2");
338 Thread.sleep(SLEEP_TIME);
339 byte[] rowKey = Bytes.toBytes("Won't be replicated");
340 Put put = new Put(rowKey);
341 put.add(famName, row, row);
342 htable1.put(put);
343
344 Get get = new Get(rowKey);
345 for (int i = 0; i < NB_RETRIES; i++) {
346 if (i == NB_RETRIES-1) {
347 break;
348 }
349 Result res = htable2.get(get);
350 if (res.size() >= 1) {
351 fail("Not supposed to be replicated");
352 } else {
353 LOG.info("Row not replicated, let's wait a bit more...");
354 Thread.sleep(SLEEP_TIME);
355 }
356 }
357
358 admin.addPeer("2", utility2.getClusterKey());
359 Thread.sleep(SLEEP_TIME);
360 rowKey = Bytes.toBytes("do rep");
361 put = new Put(rowKey);
362 put.add(famName, row, row);
363 LOG.info("Adding new row");
364 htable1.put(put);
365
366 get = new Get(rowKey);
367 for (int i = 0; i < NB_RETRIES; i++) {
368 if (i==NB_RETRIES-1) {
369 fail("Waited too much time for put replication");
370 }
371 Result res = htable2.get(get);
372 if (res.size() == 0) {
373 LOG.info("Row not available");
374 Thread.sleep(SLEEP_TIME*i);
375 } else {
376 assertArrayEquals(res.value(), row);
377 break;
378 }
379 }
380 }
381
382
383
384
385
386
387
388 @Test(timeout=300000)
389 public void testLoading() throws Exception {
390 LOG.info("Writing out rows to table1 in testLoading");
391 htable1.setWriteBufferSize(1024);
392 htable1.setAutoFlush(false, true);
393 for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
394 Put put = new Put(Bytes.toBytes(i));
395 put.add(famName, row, row);
396 htable1.put(put);
397 }
398 htable1.flushCommits();
399
400 Scan scan = new Scan();
401
402 ResultScanner scanner = htable1.getScanner(scan);
403 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
404 scanner.close();
405
406 assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
407
408 LOG.info("Looking in table2 for replicated rows in testLoading");
409 long start = System.currentTimeMillis();
410
411
412 final long retries = NB_RETRIES * 10;
413 for (int i = 0; i < retries; i++) {
414 scan = new Scan();
415 scanner = htable2.getScanner(scan);
416 res = scanner.next(NB_ROWS_IN_BIG_BATCH);
417 scanner.close();
418 if (res.length != NB_ROWS_IN_BIG_BATCH) {
419 if (i == retries - 1) {
420 int lastRow = -1;
421 for (Result result : res) {
422 int currentRow = Bytes.toInt(result.getRow());
423 for (int row = lastRow+1; row < currentRow; row++) {
424 LOG.error("Row missing: " + row);
425 }
426 lastRow = currentRow;
427 }
428 LOG.error("Last row: " + lastRow);
429 fail("Waited too much time for normal batch replication, " +
430 res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
431 (System.currentTimeMillis() - start) + "ms");
432 } else {
433 LOG.info("Only got " + res.length + " rows... retrying");
434 Thread.sleep(SLEEP_TIME);
435 }
436 } else {
437 break;
438 }
439 }
440 }
441
442
443
444
445
446
447
448 @Test(timeout=300000)
449 public void testVerifyRepJob() throws Exception {
450
451
452 testSmallBatch();
453
454 String[] args = new String[] {"2", Bytes.toString(tableName)};
455 Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
456 if (job == null) {
457 fail("Job wasn't created, see the log");
458 }
459 if (!job.waitForCompletion(true)) {
460 fail("Job failed, see the log");
461 }
462 assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
463 findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
464 assertEquals(0, job.getCounters().
465 findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
466
467 Scan scan = new Scan();
468 ResultScanner rs = htable2.getScanner(scan);
469 Put put = null;
470 for (Result result : rs) {
471 put = new Put(result.getRow());
472 Cell firstVal = result.rawCells()[0];
473 put.add(CellUtil.cloneFamily(firstVal),
474 CellUtil.cloneQualifier(firstVal), Bytes.toBytes("diff data"));
475 htable2.put(put);
476 }
477 Delete delete = new Delete(put.getRow());
478 htable2.delete(delete);
479 job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
480 if (job == null) {
481 fail("Job wasn't created, see the log");
482 }
483 if (!job.waitForCompletion(true)) {
484 fail("Job failed, see the log");
485 }
486 assertEquals(0, job.getCounters().
487 findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
488 assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
489 findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
490 }
491
492
493
494
495
496
497 @Test(timeout=300000)
498 public void testCompactionWALEdits() throws Exception {
499 WALProtos.CompactionDescriptor compactionDescriptor =
500 WALProtos.CompactionDescriptor.getDefaultInstance();
501 HRegionInfo hri = new HRegionInfo(htable1.getName(),
502 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
503 WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
504 Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit);
505 }
506
507
508
509
510
511
512
513
514 @Test(timeout = 300000)
515 public void testVerifyListReplicatedTable() throws Exception {
516 LOG.info("testVerifyListReplicatedTable");
517
518 final String tName = "VerifyListReplicated_";
519 final String colFam = "cf1";
520 final int numOfTables = 3;
521
522 HBaseAdmin hadmin = new HBaseAdmin(conf1);
523
524
525 for (int i = 0; i < numOfTables; i++) {
526 HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
527 HColumnDescriptor cfd = new HColumnDescriptor(colFam);
528 cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
529 ht.addFamily(cfd);
530 hadmin.createTable(ht);
531 }
532
533
534 List<HashMap<String, String>> replicationColFams = admin.listReplicated();
535 int[] match = new int[numOfTables];
536
537 for (int i = 0; i < replicationColFams.size(); i++) {
538 HashMap<String, String> replicationEntry = replicationColFams.get(i);
539 String tn = replicationEntry.get(ReplicationAdmin.TNAME);
540 if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
541 int m = Integer.parseInt(tn.substring(tn.length() - 1));
542 match[m]++;
543 }
544 }
545
546
547 for (int i = 0; i < match.length; i++) {
548 assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
549 }
550
551
552 for (int i = 0; i < numOfTables; i++) {
553 String ht = tName + i;
554 hadmin.disableTable(ht);
555 hadmin.deleteTable(ht);
556 }
557
558 hadmin.close();
559 }
560
561
562
563
564
565
566
567
568
569 @Test(timeout = 300000)
570 public void testReplicationStatus() throws Exception {
571 LOG.info("testReplicationStatus");
572
573 HBaseAdmin admin = utility1.getHBaseAdmin();
574 try {
575
576 final byte[] qualName = Bytes.toBytes("q");
577 Put p;
578
579 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
580 p = new Put(Bytes.toBytes("row" + i));
581 p.add(famName, qualName, Bytes.toBytes("val" + i));
582 htable1.put(p);
583 }
584
585 ClusterStatus status = admin.getClusterStatus();
586
587 for (ServerName server : status.getServers()) {
588 ServerLoad sl = status.getLoad(server);
589 List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
590 ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
591
592
593 assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0));
594
595
596 assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
597 (rLoadSink.getAgeOfLastAppliedOp() >= 0));
598 assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
599 (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
600 }
601 } finally {
602 admin.close();
603 }
604 }
605 }