View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication;
21  
22  import static org.junit.Assert.assertArrayEquals;
23  import static org.junit.Assert.assertEquals;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.*;
32  import org.apache.hadoop.hbase.client.Delete;
33  import org.apache.hadoop.hbase.client.Get;
34  import org.apache.hadoop.hbase.client.HBaseAdmin;
35  import org.apache.hadoop.hbase.client.HTable;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Result;
38  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
39  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
40  import org.apache.hadoop.hbase.testclassification.LargeTests;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
43  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44  import org.junit.BeforeClass;
45  import org.junit.Test;
46  import org.junit.experimental.categories.Category;
47  
48  @Category(LargeTests.class)
49  public class TestMultiSlaveReplication {
50  
51    private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
52  
53    private static Configuration conf1;
54    private static Configuration conf2;
55    private static Configuration conf3;
56  
57    private static HBaseTestingUtility utility1;
58    private static HBaseTestingUtility utility2;
59    private static HBaseTestingUtility utility3;
60    private static final long SLEEP_TIME = 500;
61    private static final int NB_RETRIES = 100;
62  
63    private static final byte[] tableName = Bytes.toBytes("test");
64    private static final byte[] famName = Bytes.toBytes("f");
65    private static final byte[] row = Bytes.toBytes("row");
66    private static final byte[] row1 = Bytes.toBytes("row1");
67    private static final byte[] row2 = Bytes.toBytes("row2");
68    private static final byte[] row3 = Bytes.toBytes("row3");
69    private static final byte[] noRepfamName = Bytes.toBytes("norep");
70  
71    private static HTableDescriptor table;
72  
73    @BeforeClass
74    public static void setUpBeforeClass() throws Exception {
75      conf1 = HBaseConfiguration.create();
76      conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
77      // smaller block size and capacity to trigger more operations
78      // and test them
79      conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
80      conf1.setInt("replication.source.size.capacity", 1024);
81      conf1.setLong("replication.source.sleepforretries", 100);
82      conf1.setInt("hbase.regionserver.maxlogs", 10);
83      conf1.setLong("hbase.master.logcleaner.ttl", 10);
84      conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
85      conf1.setBoolean("dfs.support.append", true);
86      conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
87      conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
88          "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
89  
90      utility1 = new HBaseTestingUtility(conf1);
91      utility1.startMiniZKCluster();
92      MiniZooKeeperCluster miniZK = utility1.getZkCluster();
93      new ZooKeeperWatcher(conf1, "cluster1", null, true);
94  
95      conf2 = new Configuration(conf1);
96      conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
97  
98      conf3 = new Configuration(conf1);
99      conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
100 
101     utility2 = new HBaseTestingUtility(conf2);
102     utility2.setZkCluster(miniZK);
103     new ZooKeeperWatcher(conf2, "cluster3", null, true);
104 
105     utility3 = new HBaseTestingUtility(conf3);
106     utility3.setZkCluster(miniZK);
107     new ZooKeeperWatcher(conf3, "cluster3", null, true);
108 
109     table = new HTableDescriptor(TableName.valueOf(tableName));
110     HColumnDescriptor fam = new HColumnDescriptor(famName);
111     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
112     table.addFamily(fam);
113     fam = new HColumnDescriptor(noRepfamName);
114     table.addFamily(fam);
115   }
116 
117   @Test(timeout=300000)
118   public void testMultiSlaveReplication() throws Exception {
119     LOG.info("testCyclicReplication");
120     MiniHBaseCluster master = utility1.startMiniCluster();
121     utility2.startMiniCluster();
122     utility3.startMiniCluster();
123     ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
124 
125     new HBaseAdmin(conf1).createTable(table);
126     new HBaseAdmin(conf2).createTable(table);
127     new HBaseAdmin(conf3).createTable(table);
128     HTable htable1 = new HTable(conf1, tableName);
129     htable1.setWriteBufferSize(1024);
130     HTable htable2 = new HTable(conf2, tableName);
131     htable2.setWriteBufferSize(1024);
132     HTable htable3 = new HTable(conf3, tableName);
133     htable3.setWriteBufferSize(1024);
134     
135     admin1.addPeer("1", utility2.getClusterKey());
136 
137     // put "row" and wait 'til it got around, then delete
138     putAndWait(row, famName, htable1, htable2);
139     deleteAndWait(row, htable1, htable2);
140     // check it wasn't replication to cluster 3
141     checkRow(row,0,htable3);
142 
143     putAndWait(row2, famName, htable1, htable2);
144 
145     // now roll the region server's logs
146     new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0).getServerName().toString());
147     // after the log was rolled put a new row
148     putAndWait(row3, famName, htable1, htable2);
149 
150     admin1.addPeer("2", utility3.getClusterKey());
151 
152     // put a row, check it was replicated to all clusters
153     putAndWait(row1, famName, htable1, htable2, htable3);
154     // delete and verify
155     deleteAndWait(row1, htable1, htable2, htable3);
156 
157     // make sure row2 did not get replicated after
158     // cluster 3 was added
159     checkRow(row2,0,htable3);
160 
161     // row3 will get replicated, because it was in the
162     // latest log
163     checkRow(row3,1,htable3);
164 
165     Put p = new Put(row);
166     p.add(famName, row, row);
167     htable1.put(p);
168     // now roll the logs again
169     new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0)
170         .getServerName().toString());
171 
172     // cleanup "row2", also conveniently use this to wait replication
173     // to finish
174     deleteAndWait(row2, htable1, htable2, htable3);
175     // Even if the log was rolled in the middle of the replication
176     // "row" is still replication.
177     checkRow(row, 1, htable2);
178     // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, 
179     // we should wait before checking.
180     checkWithWait(row, 1, htable3);
181 
182     // cleanup the rest
183     deleteAndWait(row, htable1, htable2, htable3);
184     deleteAndWait(row3, htable1, htable2, htable3);
185 
186     utility3.shutdownMiniCluster();
187     utility2.shutdownMiniCluster();
188     utility1.shutdownMiniCluster();
189   }
190  
191   private void checkWithWait(byte[] row, int count, HTable table) throws Exception {
192     Get get = new Get(row);
193     for (int i = 0; i < NB_RETRIES; i++) {
194       if (i == NB_RETRIES - 1) {
195         fail("Waited too much time while getting the row.");
196       }
197       boolean rowReplicated = false;
198       Result res = table.get(get);
199       if (res.size() >= 1) {
200         LOG.info("Row is replicated");
201         rowReplicated = true;
202         assertEquals(count, res.size());
203         break;
204       }
205       if (rowReplicated) {
206         break;
207       } else {
208         Thread.sleep(SLEEP_TIME);
209       }
210     }
211   }
212   
213   private void checkRow(byte[] row, int count, HTable... tables) throws IOException {
214     Get get = new Get(row);
215     for (HTable table : tables) {
216       Result res = table.get(get);
217       assertEquals(count, res.size());
218     }
219   }
220 
221   private void deleteAndWait(byte[] row, HTable source, HTable... targets)
222   throws Exception {
223     Delete del = new Delete(row);
224     source.delete(del);
225 
226     Get get = new Get(row);
227     for (int i = 0; i < NB_RETRIES; i++) {
228       if (i==NB_RETRIES-1) {
229         fail("Waited too much time for del replication");
230       }
231       boolean removedFromAll = true;
232       for (HTable target : targets) {
233         Result res = target.get(get);
234         if (res.size() >= 1) {
235           LOG.info("Row not deleted");
236           removedFromAll = false;
237           break;
238         }
239       }
240       if (removedFromAll) {
241         break;
242       } else {
243         Thread.sleep(SLEEP_TIME);        
244       }
245     }
246   }
247 
248   private void putAndWait(byte[] row, byte[] fam, HTable source, HTable... targets)
249   throws Exception {
250     Put put = new Put(row);
251     put.add(fam, row, row);
252     source.put(put);
253 
254     Get get = new Get(row);
255     for (int i = 0; i < NB_RETRIES; i++) {
256       if (i==NB_RETRIES-1) {
257         fail("Waited too much time for put replication");
258       }
259       boolean replicatedToAll = true;
260       for (HTable target : targets) {
261         Result res = target.get(get);
262         if (res.size() == 0) {
263           LOG.info("Row not available");
264           replicatedToAll = false;
265           break;
266         } else {
267           assertArrayEquals(res.value(), row);
268         }
269       }
270       if (replicatedToAll) {
271         break;
272       } else {
273         Thread.sleep(SLEEP_TIME);
274       }
275     }
276   }
277 
278 }
279