View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.*;
28  import org.apache.hadoop.hbase.client.Delete;
29  import org.apache.hadoop.hbase.client.HBaseAdmin;
30  import org.apache.hadoop.hbase.client.HTable;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
33  import org.apache.hadoop.hbase.testclassification.LargeTests;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
36  import org.junit.Before;
37  import org.junit.Test;
38  import org.junit.experimental.categories.Category;
39  
40  @Category(LargeTests.class)
41  public class TestReplicationSyncUpTool extends TestReplicationBase {
42  
43    private static final Log LOG = LogFactory.getLog(TestReplicationSyncUpTool.class);
44  
45    private static final byte[] t1_su = Bytes.toBytes("t1_syncup");
46    private static final byte[] t2_su = Bytes.toBytes("t2_syncup");
47  
48    private static final byte[] famName = Bytes.toBytes("cf1");
49    private static final byte[] qualName = Bytes.toBytes("q1");
50  
51    private static final byte[] noRepfamName = Bytes.toBytes("norep");
52  
53    private HTableDescriptor t1_syncupSource, t1_syncupTarget;
54    private HTableDescriptor t2_syncupSource, t2_syncupTarget;
55  
56    private HTable ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
57  
58    @Before
59    public void setUp() throws Exception {
60  
61      HColumnDescriptor fam;
62  
63      t1_syncupSource = new HTableDescriptor(TableName.valueOf(t1_su));
64      fam = new HColumnDescriptor(famName);
65      fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
66      t1_syncupSource.addFamily(fam);
67      fam = new HColumnDescriptor(noRepfamName);
68      t1_syncupSource.addFamily(fam);
69  
70      t1_syncupTarget = new HTableDescriptor(TableName.valueOf(t1_su));
71      fam = new HColumnDescriptor(famName);
72      t1_syncupTarget.addFamily(fam);
73      fam = new HColumnDescriptor(noRepfamName);
74      t1_syncupTarget.addFamily(fam);
75  
76      t2_syncupSource = new HTableDescriptor(TableName.valueOf(t2_su));
77      fam = new HColumnDescriptor(famName);
78      fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
79      t2_syncupSource.addFamily(fam);
80      fam = new HColumnDescriptor(noRepfamName);
81      t2_syncupSource.addFamily(fam);
82  
83      t2_syncupTarget = new HTableDescriptor(TableName.valueOf(t2_su));
84      fam = new HColumnDescriptor(famName);
85      t2_syncupTarget.addFamily(fam);
86      fam = new HColumnDescriptor(noRepfamName);
87      t2_syncupTarget.addFamily(fam);
88  
89    }
90  
91    /**
92     * Add a row to a table in each cluster, check it's replicated, delete it,
93     * check's gone Also check the puts and deletes are not replicated back to
94     * the originating cluster.
95     */
96    @Test(timeout = 300000)
97    public void testSyncUpTool() throws Exception {
98  
99      /**
100      * Set up Replication: on Master and one Slave
101      * Table: t1_syncup and t2_syncup
102      * columnfamily:
103      *    'cf1'  : replicated
104      *    'norep': not replicated
105      */
106     setupReplication();
107 
108     /**
109      * at Master:
110      * t1_syncup: put 100 rows into cf1, and 1 rows into norep
111      * t2_syncup: put 200 rows into cf1, and 1 rows into norep
112      *
113      * verify correctly replicated to slave
114      */
115     putAndReplicateRows();
116 
117     /**
118      * Verify delete works
119      *
120      * step 1: stop hbase on Slave
121      *
122      * step 2: at Master:
123      *  t1_syncup: delete 50 rows  from cf1
124      *  t2_syncup: delete 100 rows from cf1
125      *  no change on 'norep'
126      *
127      * step 3: stop hbase on master, restart hbase on Slave
128      *
129      * step 4: verify Slave still have the rows before delete
130      *      t1_syncup: 100 rows from cf1
131      *      t2_syncup: 200 rows from cf1
132      *
133      * step 5: run syncup tool on Master
134      *
135      * step 6: verify that delete show up on Slave
136      *      t1_syncup: 50 rows from cf1
137      *      t2_syncup: 100 rows from cf1
138      *
139      * verify correctly replicated to Slave
140      */
141     mimicSyncUpAfterDelete();
142 
143     /**
144      * Verify put works
145      *
146      * step 1: stop hbase on Slave
147      *
148      * step 2: at Master:
149      *  t1_syncup: put 100 rows  from cf1
150      *  t2_syncup: put 200 rows  from cf1
151      *  and put another row on 'norep'
152      *  ATTN: put to 'cf1' will overwrite existing rows, so end count will
153      *        be 100 and 200 respectively
154      *      put to 'norep' will add a new row.
155      *
156      * step 3: stop hbase on master, restart hbase on Slave
157      *
158      * step 4: verify Slave still has the rows before put
159      *      t1_syncup: 50 rows from cf1
160      *      t2_syncup: 100 rows from cf1
161      *
162      * step 5: run syncup tool on Master
163      *
164      * step 6: verify that put show up on Slave
165      *         and 'norep' does not
166      *      t1_syncup: 100 rows from cf1
167      *      t2_syncup: 200 rows from cf1
168      *
169      * verify correctly replicated to Slave
170      */
171     mimicSyncUpAfterPut();
172 
173   }
174 
175   private void setupReplication() throws Exception {
176     ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
177     ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
178 
179     HBaseAdmin ha = new HBaseAdmin(conf1);
180     ha.createTable(t1_syncupSource);
181     ha.createTable(t2_syncupSource);
182     ha.close();
183 
184     ha = new HBaseAdmin(conf2);
185     ha.createTable(t1_syncupTarget);
186     ha.createTable(t2_syncupTarget);
187     ha.close();
188 
189     // Get HTable from Master
190     ht1Source = new HTable(conf1, t1_su);
191     ht1Source.setWriteBufferSize(1024);
192     ht2Source = new HTable(conf1, t2_su);
193     ht1Source.setWriteBufferSize(1024);
194 
195     // Get HTable from Peer1
196     ht1TargetAtPeer1 = new HTable(conf2, t1_su);
197     ht1TargetAtPeer1.setWriteBufferSize(1024);
198     ht2TargetAtPeer1 = new HTable(conf2, t2_su);
199     ht2TargetAtPeer1.setWriteBufferSize(1024);
200 
201     /**
202      * set M-S : Master: utility1 Slave1: utility2
203      */
204     admin1.addPeer("1", utility2.getClusterKey());
205 
206     admin1.close();
207     admin2.close();
208   }
209 
210   private void putAndReplicateRows() throws Exception {
211     LOG.debug("putAndReplicateRows");
212     // add rows to Master cluster,
213     Put p;
214 
215     // 100 + 1 row to t1_syncup
216     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
217       p = new Put(Bytes.toBytes("row" + i));
218       p.add(famName, qualName, Bytes.toBytes("val" + i));
219       ht1Source.put(p);
220     }
221     p = new Put(Bytes.toBytes("row" + 9999));
222     p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
223     ht1Source.put(p);
224 
225     // 200 + 1 row to t2_syncup
226     for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
227       p = new Put(Bytes.toBytes("row" + i));
228       p.add(famName, qualName, Bytes.toBytes("val" + i));
229       ht2Source.put(p);
230     }
231     p = new Put(Bytes.toBytes("row" + 9999));
232     p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
233     ht2Source.put(p);
234 
235     // ensure replication completed
236     Thread.sleep(SLEEP_TIME);
237     int rowCount_ht1Source = utility1.countRows(ht1Source);
238     for (int i = 0; i < NB_RETRIES; i++) {
239       int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
240       if (i==NB_RETRIES-1) {
241         assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
242             rowCount_ht1TargetAtPeer1);
243       }
244       if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) {
245         break;
246       }
247       Thread.sleep(SLEEP_TIME);
248     }
249 
250     int rowCount_ht2Source = utility1.countRows(ht2Source);
251     for (int i = 0; i < NB_RETRIES; i++) {
252       int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
253       if (i==NB_RETRIES-1) {
254         assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
255             rowCount_ht2TargetAtPeer1);
256       }
257       if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) {
258         break;
259       }
260       Thread.sleep(SLEEP_TIME);
261     }
262   }
263 
264   private void mimicSyncUpAfterDelete() throws Exception {
265     LOG.debug("mimicSyncUpAfterDelete");
266     utility2.shutdownMiniHBaseCluster();
267 
268     List<Delete> list = new ArrayList<Delete>();
269     // delete half of the rows
270     for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
271       String rowKey = "row" + i;
272       Delete del = new Delete(rowKey.getBytes());
273       list.add(del);
274     }
275     ht1Source.delete(list);
276 
277     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
278       String rowKey = "row" + i;
279       Delete del = new Delete(rowKey.getBytes());
280       list.add(del);
281     }
282     ht2Source.delete(list);
283 
284     int rowCount_ht1Source = utility1.countRows(ht1Source);
285     assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
286       rowCount_ht1Source);
287 
288     int rowCount_ht2Source = utility1.countRows(ht2Source);
289     assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
290       101, rowCount_ht2Source);
291 
292     utility1.shutdownMiniHBaseCluster();
293     utility2.restartHBaseCluster(1);
294 
295     Thread.sleep(SLEEP_TIME);
296 
297     // before sync up
298     int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
299     int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
300     assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
301     assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
302 
303     // After sync up
304     for (int i = 0; i < NB_RETRIES; i++) {
305       syncUp(utility1);
306       rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
307       rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
308       if (i == NB_RETRIES - 1) {
309         if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) {
310           // syncUP still failed. Let's look at the source in case anything wrong there
311           utility1.restartHBaseCluster(1);
312           rowCount_ht1Source = utility1.countRows(ht1Source);
313           LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
314           rowCount_ht2Source = utility1.countRows(ht2Source);
315           LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
316         }
317         assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
318           rowCount_ht1TargetAtPeer1);
319         assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
320           rowCount_ht2TargetAtPeer1);
321       }
322       if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) {
323         LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
324         break;
325       } else {
326         LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
327             + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
328             + rowCount_ht2TargetAtPeer1);
329       }
330       Thread.sleep(SLEEP_TIME);
331     }
332   }
333 
334   private void mimicSyncUpAfterPut() throws Exception {
335     LOG.debug("mimicSyncUpAfterPut");
336     utility1.restartHBaseCluster(1);
337     utility2.shutdownMiniHBaseCluster();
338 
339     Put p;
340     // another 100 + 1 row to t1_syncup
341     // we should see 100 + 2 rows now
342     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
343       p = new Put(Bytes.toBytes("row" + i));
344       p.add(famName, qualName, Bytes.toBytes("val" + i));
345       ht1Source.put(p);
346     }
347     p = new Put(Bytes.toBytes("row" + 9998));
348     p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
349     ht1Source.put(p);
350 
351     // another 200 + 1 row to t1_syncup
352     // we should see 200 + 2 rows now
353     for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
354       p = new Put(Bytes.toBytes("row" + i));
355       p.add(famName, qualName, Bytes.toBytes("val" + i));
356       ht2Source.put(p);
357     }
358     p = new Put(Bytes.toBytes("row" + 9998));
359     p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
360     ht2Source.put(p);
361 
362     int rowCount_ht1Source = utility1.countRows(ht1Source);
363     assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
364     int rowCount_ht2Source = utility1.countRows(ht2Source);
365     assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
366 
367     utility1.shutdownMiniHBaseCluster();
368     utility2.restartHBaseCluster(1);
369 
370     Thread.sleep(SLEEP_TIME);
371 
372     // before sync up
373     int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
374     int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
375     assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
376       rowCount_ht1TargetAtPeer1);
377     assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
378       rowCount_ht2TargetAtPeer1);
379 
380     // after syun up
381     for (int i = 0; i < NB_RETRIES; i++) {
382       syncUp(utility1);
383       rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
384       rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
385       if (i == NB_RETRIES - 1) {
386         if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) {
387           // syncUP still failed. Let's look at the source in case anything wrong there
388           utility1.restartHBaseCluster(1);
389           rowCount_ht1Source = utility1.countRows(ht1Source);
390           LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
391           rowCount_ht2Source = utility1.countRows(ht2Source);
392           LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
393         }
394         assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
395           rowCount_ht1TargetAtPeer1);
396         assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
397           rowCount_ht2TargetAtPeer1);
398       }
399       if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) {
400         LOG.info("SyncUpAfterPut succeeded at retry = " + i);
401         break;
402       } else {
403         LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
404             + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
405             + rowCount_ht2TargetAtPeer1);
406       }
407       Thread.sleep(SLEEP_TIME);
408     }
409   }
410 
411   private void syncUp(HBaseTestingUtility ut) throws Exception {
412     ReplicationSyncUp.setConfigure(ut.getConfiguration());
413     String[] arguments = new String[] { null };
414     new ReplicationSyncUp().run(arguments);
415   }
416 
417 }