View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.util.HashMap;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.ClusterStatus;
34  import org.apache.hadoop.hbase.HColumnDescriptor;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.HTableDescriptor;
38  import org.apache.hadoop.hbase.ServerLoad;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.client.Delete;
42  import org.apache.hadoop.hbase.client.Get;
43  import org.apache.hadoop.hbase.client.HBaseAdmin;
44  import org.apache.hadoop.hbase.client.HTable;
45  import org.apache.hadoop.hbase.client.Put;
46  import org.apache.hadoop.hbase.client.Result;
47  import org.apache.hadoop.hbase.client.ResultScanner;
48  import org.apache.hadoop.hbase.client.Scan;
49  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
50  import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
51  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
52  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
53  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54  import org.apache.hadoop.hbase.replication.regionserver.Replication;
55  import org.apache.hadoop.hbase.testclassification.LargeTests;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58  import org.apache.hadoop.hbase.util.JVMClusterUtil;
59  import org.apache.hadoop.mapreduce.Job;
60  import org.junit.Before;
61  import org.junit.Test;
62  import org.junit.experimental.categories.Category;
63  
64  @Category(LargeTests.class)
65  public class TestReplicationSmallTests extends TestReplicationBase {
66  
67    private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
68  
69    /**
70     * @throws java.lang.Exception
71     */
72    @Before
73    public void setUp() throws Exception {
74      htable1.setAutoFlush(true, true);
75      // Starting and stopping replication can make us miss new logs,
76      // rolling like this makes sure the most recent one gets added to the queue
77      for ( JVMClusterUtil.RegionServerThread r :
78          utility1.getHBaseCluster().getRegionServerThreads()) {
79        r.getRegionServer().getWAL().rollWriter();
80      }
81      utility1.truncateTable(tableName);
82      // truncating the table will send one Delete per row to the slave cluster
83      // in an async fashion, which is why we cannot just call truncateTable on
84      // utility2 since late writes could make it to the slave in some way.
85      // Instead, we truncate the first table and wait for all the Deletes to
86      // make it to the slave.
87      Scan scan = new Scan();
88      int lastCount = 0;
89      for (int i = 0; i < NB_RETRIES; i++) {
90        if (i==NB_RETRIES-1) {
91          fail("Waited too much time for truncate");
92        }
93        ResultScanner scanner = htable2.getScanner(scan);
94        Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
95        scanner.close();
96        if (res.length != 0) {
97          if (res.length < lastCount) {
98            i--; // Don't increment timeout if we make progress
99          }
100         lastCount = res.length;
101         LOG.info("Still got " + res.length + " rows");
102         Thread.sleep(SLEEP_TIME);
103       } else {
104         break;
105       }
106     }
107   }
108 
109   /**
110    * Verify that version and column delete marker types are replicated
111    * correctly.
112    * @throws Exception
113    */
114   @Test(timeout=300000)
115   public void testDeleteTypes() throws Exception {
116     LOG.info("testDeleteTypes");
117     final byte[] v1 = Bytes.toBytes("v1");
118     final byte[] v2 = Bytes.toBytes("v2");
119     final byte[] v3 = Bytes.toBytes("v3");
120     htable1 = new HTable(conf1, tableName);
121 
122     long t = EnvironmentEdgeManager.currentTimeMillis();
123     // create three versions for "row"
124     Put put = new Put(row);
125     put.add(famName, row, t, v1);
126     htable1.put(put);
127 
128     put = new Put(row);
129     put.add(famName, row, t+1, v2);
130     htable1.put(put);
131 
132     put = new Put(row);
133     put.add(famName, row, t+2, v3);
134     htable1.put(put);
135 
136     Get get = new Get(row);
137     get.setMaxVersions();
138     for (int i = 0; i < NB_RETRIES; i++) {
139       if (i==NB_RETRIES-1) {
140         fail("Waited too much time for put replication");
141       }
142       Result res = htable2.get(get);
143       if (res.size() < 3) {
144         LOG.info("Rows not available");
145         Thread.sleep(SLEEP_TIME);
146       } else {
147         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
148         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
149         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
150         break;
151       }
152     }
153     // place a version delete marker (delete last version)
154     Delete d = new Delete(row);
155     d.deleteColumn(famName, row, t);
156     htable1.delete(d);
157 
158     get = new Get(row);
159     get.setMaxVersions();
160     for (int i = 0; i < NB_RETRIES; i++) {
161       if (i==NB_RETRIES-1) {
162         fail("Waited too much time for put replication");
163       }
164       Result res = htable2.get(get);
165       if (res.size() > 2) {
166         LOG.info("Version not deleted");
167         Thread.sleep(SLEEP_TIME);
168       } else {
169         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
170         assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
171         break;
172       }
173     }
174 
175     // place a column delete marker
176     d = new Delete(row);
177     d.deleteColumns(famName, row, t+2);
178     htable1.delete(d);
179 
180     // now *both* of the remaining version should be deleted
181     // at the replica
182     get = new Get(row);
183     for (int i = 0; i < NB_RETRIES; i++) {
184       if (i==NB_RETRIES-1) {
185         fail("Waited too much time for del replication");
186       }
187       Result res = htable2.get(get);
188       if (res.size() >= 1) {
189         LOG.info("Rows not deleted");
190         Thread.sleep(SLEEP_TIME);
191       } else {
192         break;
193       }
194     }
195   }
196 
197   /**
198    * Add a row, check it's replicated, delete it, check's gone
199    * @throws Exception
200    */
201   @Test(timeout=300000)
202   public void testSimplePutDelete() throws Exception {
203     LOG.info("testSimplePutDelete");
204     Put put = new Put(row);
205     put.add(famName, row, row);
206 
207     htable1 = new HTable(conf1, tableName);
208     htable1.put(put);
209 
210     Get get = new Get(row);
211     for (int i = 0; i < NB_RETRIES; i++) {
212       if (i==NB_RETRIES-1) {
213         fail("Waited too much time for put replication");
214       }
215       Result res = htable2.get(get);
216       if (res.size() == 0) {
217         LOG.info("Row not available");
218         Thread.sleep(SLEEP_TIME);
219       } else {
220         assertArrayEquals(res.value(), row);
221         break;
222       }
223     }
224 
225     Delete del = new Delete(row);
226     htable1.delete(del);
227 
228     get = new Get(row);
229     for (int i = 0; i < NB_RETRIES; i++) {
230       if (i==NB_RETRIES-1) {
231         fail("Waited too much time for del replication");
232       }
233       Result res = htable2.get(get);
234       if (res.size() >= 1) {
235         LOG.info("Row not deleted");
236         Thread.sleep(SLEEP_TIME);
237       } else {
238         break;
239       }
240     }
241   }
242 
243   /**
244    * Try a small batch upload using the write buffer, check it's replicated
245    * @throws Exception
246    */
247   @Test(timeout=300000)
248   public void testSmallBatch() throws Exception {
249     LOG.info("testSmallBatch");
250     Put put;
251     // normal Batch tests
252     htable1.setAutoFlush(false, true);
253     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
254       put = new Put(Bytes.toBytes(i));
255       put.add(famName, row, row);
256       htable1.put(put);
257     }
258     htable1.flushCommits();
259 
260     Scan scan = new Scan();
261 
262     ResultScanner scanner1 = htable1.getScanner(scan);
263     Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
264     scanner1.close();
265     assertEquals(NB_ROWS_IN_BATCH, res1.length);
266 
267     for (int i = 0; i < NB_RETRIES; i++) {
268       scan = new Scan();
269       if (i==NB_RETRIES-1) {
270         fail("Waited too much time for normal batch replication");
271       }
272       ResultScanner scanner = htable2.getScanner(scan);
273       Result[] res = scanner.next(NB_ROWS_IN_BATCH);
274       scanner.close();
275       if (res.length != NB_ROWS_IN_BATCH) {
276         LOG.info("Only got " + res.length + " rows");
277         Thread.sleep(SLEEP_TIME);
278       } else {
279         break;
280       }
281     }
282   }
283 
284   /**
285    * Test disable/enable replication, trying to insert, make sure nothing's
286    * replicated, enable it, the insert should be replicated
287    *
288    * @throws Exception
289    */
290   @Test(timeout = 300000)
291   public void testDisableEnable() throws Exception {
292 
293     // Test disabling replication
294     admin.disablePeer("2");
295 
296     byte[] rowkey = Bytes.toBytes("disable enable");
297     Put put = new Put(rowkey);
298     put.add(famName, row, row);
299     htable1.put(put);
300 
301     Get get = new Get(rowkey);
302     for (int i = 0; i < NB_RETRIES; i++) {
303       Result res = htable2.get(get);
304       if (res.size() >= 1) {
305         fail("Replication wasn't disabled");
306       } else {
307         LOG.info("Row not replicated, let's wait a bit more...");
308         Thread.sleep(SLEEP_TIME);
309       }
310     }
311 
312     // Test enable replication
313     admin.enablePeer("2");
314 
315     for (int i = 0; i < NB_RETRIES; i++) {
316       Result res = htable2.get(get);
317       if (res.size() == 0) {
318         LOG.info("Row not available");
319         Thread.sleep(SLEEP_TIME);
320       } else {
321         assertArrayEquals(res.value(), row);
322         return;
323       }
324     }
325     fail("Waited too much time for put replication");
326   }
327 
328   /**
329    * Integration test for TestReplicationAdmin, removes and re-add a peer
330    * cluster
331    *
332    * @throws Exception
333    */
334   @Test(timeout=300000)
335   public void testAddAndRemoveClusters() throws Exception {
336     LOG.info("testAddAndRemoveClusters");
337     admin.removePeer("2");
338     Thread.sleep(SLEEP_TIME);
339     byte[] rowKey = Bytes.toBytes("Won't be replicated");
340     Put put = new Put(rowKey);
341     put.add(famName, row, row);
342     htable1.put(put);
343 
344     Get get = new Get(rowKey);
345     for (int i = 0; i < NB_RETRIES; i++) {
346       if (i == NB_RETRIES-1) {
347         break;
348       }
349       Result res = htable2.get(get);
350       if (res.size() >= 1) {
351         fail("Not supposed to be replicated");
352       } else {
353         LOG.info("Row not replicated, let's wait a bit more...");
354         Thread.sleep(SLEEP_TIME);
355       }
356     }
357 
358     admin.addPeer("2", utility2.getClusterKey());
359     Thread.sleep(SLEEP_TIME);
360     rowKey = Bytes.toBytes("do rep");
361     put = new Put(rowKey);
362     put.add(famName, row, row);
363     LOG.info("Adding new row");
364     htable1.put(put);
365 
366     get = new Get(rowKey);
367     for (int i = 0; i < NB_RETRIES; i++) {
368       if (i==NB_RETRIES-1) {
369         fail("Waited too much time for put replication");
370       }
371       Result res = htable2.get(get);
372       if (res.size() == 0) {
373         LOG.info("Row not available");
374         Thread.sleep(SLEEP_TIME*i);
375       } else {
376         assertArrayEquals(res.value(), row);
377         break;
378       }
379     }
380   }
381 
382 
383   /**
384    * Do a more intense version testSmallBatch, one  that will trigger
385    * hlog rolling and other non-trivial code paths
386    * @throws Exception
387    */
388   @Test(timeout=300000)
389   public void testLoading() throws Exception {
390     LOG.info("Writing out rows to table1 in testLoading");
391     htable1.setWriteBufferSize(1024);
392     htable1.setAutoFlush(false, true);
393     for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
394       Put put = new Put(Bytes.toBytes(i));
395       put.add(famName, row, row);
396       htable1.put(put);
397     }
398     htable1.flushCommits();
399 
400     Scan scan = new Scan();
401 
402     ResultScanner scanner = htable1.getScanner(scan);
403     Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
404     scanner.close();
405 
406     assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
407 
408     LOG.info("Looking in table2 for replicated rows in testLoading");
409     long start = System.currentTimeMillis();
410     // Retry more than NB_RETRIES.  As it was, retries were done in 5 seconds and we'd fail
411     // sometimes.
412     final long retries = NB_RETRIES * 10;
413     for (int i = 0; i < retries; i++) {
414       scan = new Scan();
415       scanner = htable2.getScanner(scan);
416       res = scanner.next(NB_ROWS_IN_BIG_BATCH);
417       scanner.close();
418       if (res.length != NB_ROWS_IN_BIG_BATCH) {
419         if (i == retries - 1) {
420           int lastRow = -1;
421           for (Result result : res) {
422             int currentRow = Bytes.toInt(result.getRow());
423             for (int row = lastRow+1; row < currentRow; row++) {
424               LOG.error("Row missing: " + row);
425             }
426             lastRow = currentRow;
427           }
428           LOG.error("Last row: " + lastRow);
429           fail("Waited too much time for normal batch replication, " +
430             res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
431             (System.currentTimeMillis() - start) + "ms");
432         } else {
433           LOG.info("Only got " + res.length + " rows... retrying");
434           Thread.sleep(SLEEP_TIME);
435         }
436       } else {
437         break;
438       }
439     }
440   }
441 
442   /**
443    * Do a small loading into a table, make sure the data is really the same,
444    * then run the VerifyReplication job to check the results. Do a second
445    * comparison where all the cells are different.
446    * @throws Exception
447    */
448   @Test(timeout=300000)
449   public void testVerifyRepJob() throws Exception {
450     // Populate the tables, at the same time it guarantees that the tables are
451     // identical since it does the check
452     testSmallBatch();
453 
454     String[] args = new String[] {"2", Bytes.toString(tableName)};
455     Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
456     if (job == null) {
457       fail("Job wasn't created, see the log");
458     }
459     if (!job.waitForCompletion(true)) {
460       fail("Job failed, see the log");
461     }
462     assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
463         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
464     assertEquals(0, job.getCounters().
465         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
466 
467     Scan scan = new Scan();
468     ResultScanner rs = htable2.getScanner(scan);
469     Put put = null;
470     for (Result result : rs) {
471       put = new Put(result.getRow());
472       Cell firstVal = result.rawCells()[0];
473       put.add(CellUtil.cloneFamily(firstVal),
474           CellUtil.cloneQualifier(firstVal), Bytes.toBytes("diff data"));
475       htable2.put(put);
476     }
477     Delete delete = new Delete(put.getRow());
478     htable2.delete(delete);
479     job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
480     if (job == null) {
481       fail("Job wasn't created, see the log");
482     }
483     if (!job.waitForCompletion(true)) {
484       fail("Job failed, see the log");
485     }
486     assertEquals(0, job.getCounters().
487         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
488     assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
489         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
490   }
491 
492   /**
493    * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out
494    * the compaction WALEdit
495    * @throws Exception
496    */
497   @Test(timeout=300000)
498   public void testCompactionWALEdits() throws Exception {
499     WALProtos.CompactionDescriptor compactionDescriptor =
500         WALProtos.CompactionDescriptor.getDefaultInstance();
501     HRegionInfo hri = new HRegionInfo(htable1.getName(),
502       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
503     WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
504     Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit);
505   }
506 
507   /**
508    * Test for HBASE-8663
509    * Create two new Tables with colfamilies enabled for replication then run
510    * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
511    * TestReplicationAdmin is a better place for this testing but it would need mocks.
512    * @throws Exception
513    */
514   @Test(timeout = 300000)
515   public void testVerifyListReplicatedTable() throws Exception {
516 	LOG.info("testVerifyListReplicatedTable");
517 
518     final String tName = "VerifyListReplicated_";
519     final String colFam = "cf1";
520     final int numOfTables = 3;
521 
522     HBaseAdmin hadmin = new HBaseAdmin(conf1);
523 
524     // Create Tables
525     for (int i = 0; i < numOfTables; i++) {
526       HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
527       HColumnDescriptor cfd = new HColumnDescriptor(colFam);
528       cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
529       ht.addFamily(cfd);
530       hadmin.createTable(ht);
531     }
532 
533     // verify the result
534     List<HashMap<String, String>> replicationColFams = admin.listReplicated();
535     int[] match = new int[numOfTables]; // array of 3 with init value of zero
536 
537     for (int i = 0; i < replicationColFams.size(); i++) {
538       HashMap<String, String> replicationEntry = replicationColFams.get(i);
539       String tn = replicationEntry.get(ReplicationAdmin.TNAME);
540       if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
541         int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
542         match[m]++; // should only increase once
543       }
544     }
545 
546     // check the matching result
547     for (int i = 0; i < match.length; i++) {
548       assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
549     }
550 
551     // drop tables
552     for (int i = 0; i < numOfTables; i++) {
553       String ht = tName + i;
554       hadmin.disableTable(ht);
555       hadmin.deleteTable(ht);
556     }
557 
558     hadmin.close();
559   }
560 
561   /**
562    * Test for HBASE-9531
563    * put a few rows into htable1, which should be replicated to htable2
564    * create a ClusterStatus instance 'status' from HBaseAdmin
565    * test : status.getLoad(server).getReplicationLoadSourceList()
566    * test : status.getLoad(server).getReplicationLoadSink()
567    * * @throws Exception
568    */
569   @Test(timeout = 300000)
570   public void testReplicationStatus() throws Exception {
571     LOG.info("testReplicationStatus");
572 
573     HBaseAdmin admin = utility1.getHBaseAdmin();
574     try {
575 
576       final byte[] qualName = Bytes.toBytes("q");
577       Put p;
578 
579       for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
580         p = new Put(Bytes.toBytes("row" + i));
581         p.add(famName, qualName, Bytes.toBytes("val" + i));
582         htable1.put(p);
583       }
584 
585       ClusterStatus status = admin.getClusterStatus();
586 
587       for (ServerName server : status.getServers()) {
588         ServerLoad sl = status.getLoad(server);
589         List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
590         ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
591 
592         // check SourceList has at least one entry
593         assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0));
594 
595         // check Sink exist only as it is difficult to verify the value on the fly
596         assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
597           (rLoadSink.getAgeOfLastAppliedOp() >= 0));
598         assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
599           (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
600       }
601     } finally {
602       admin.close();
603     }
604   }
605 }