1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication;
21
22 import static org.junit.Assert.assertArrayEquals;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.*;
32 import org.apache.hadoop.hbase.client.Delete;
33 import org.apache.hadoop.hbase.client.Get;
34 import org.apache.hadoop.hbase.client.HBaseAdmin;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Result;
38 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
39 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
40 import org.apache.hadoop.hbase.testclassification.LargeTests;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
43 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44 import org.junit.BeforeClass;
45 import org.junit.Test;
46 import org.junit.experimental.categories.Category;
47
48 @Category(LargeTests.class)
49 public class TestMultiSlaveReplication {
50
51 private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
52
53 private static Configuration conf1;
54 private static Configuration conf2;
55 private static Configuration conf3;
56
57 private static HBaseTestingUtility utility1;
58 private static HBaseTestingUtility utility2;
59 private static HBaseTestingUtility utility3;
60 private static final long SLEEP_TIME = 500;
61 private static final int NB_RETRIES = 100;
62
63 private static final byte[] tableName = Bytes.toBytes("test");
64 private static final byte[] famName = Bytes.toBytes("f");
65 private static final byte[] row = Bytes.toBytes("row");
66 private static final byte[] row1 = Bytes.toBytes("row1");
67 private static final byte[] row2 = Bytes.toBytes("row2");
68 private static final byte[] row3 = Bytes.toBytes("row3");
69 private static final byte[] noRepfamName = Bytes.toBytes("norep");
70
71 private static HTableDescriptor table;
72
73 @BeforeClass
74 public static void setUpBeforeClass() throws Exception {
75 conf1 = HBaseConfiguration.create();
76 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
77
78
79 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
80 conf1.setInt("replication.source.size.capacity", 1024);
81 conf1.setLong("replication.source.sleepforretries", 100);
82 conf1.setInt("hbase.regionserver.maxlogs", 10);
83 conf1.setLong("hbase.master.logcleaner.ttl", 10);
84 conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
85 conf1.setBoolean("dfs.support.append", true);
86 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
87 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
88 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
89
90 utility1 = new HBaseTestingUtility(conf1);
91 utility1.startMiniZKCluster();
92 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
93 new ZooKeeperWatcher(conf1, "cluster1", null, true);
94
95 conf2 = new Configuration(conf1);
96 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
97
98 conf3 = new Configuration(conf1);
99 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
100
101 utility2 = new HBaseTestingUtility(conf2);
102 utility2.setZkCluster(miniZK);
103 new ZooKeeperWatcher(conf2, "cluster3", null, true);
104
105 utility3 = new HBaseTestingUtility(conf3);
106 utility3.setZkCluster(miniZK);
107 new ZooKeeperWatcher(conf3, "cluster3", null, true);
108
109 table = new HTableDescriptor(TableName.valueOf(tableName));
110 HColumnDescriptor fam = new HColumnDescriptor(famName);
111 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
112 table.addFamily(fam);
113 fam = new HColumnDescriptor(noRepfamName);
114 table.addFamily(fam);
115 }
116
117 @Test(timeout=300000)
118 public void testMultiSlaveReplication() throws Exception {
119 LOG.info("testCyclicReplication");
120 MiniHBaseCluster master = utility1.startMiniCluster();
121 utility2.startMiniCluster();
122 utility3.startMiniCluster();
123 ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
124
125 new HBaseAdmin(conf1).createTable(table);
126 new HBaseAdmin(conf2).createTable(table);
127 new HBaseAdmin(conf3).createTable(table);
128 HTable htable1 = new HTable(conf1, tableName);
129 htable1.setWriteBufferSize(1024);
130 HTable htable2 = new HTable(conf2, tableName);
131 htable2.setWriteBufferSize(1024);
132 HTable htable3 = new HTable(conf3, tableName);
133 htable3.setWriteBufferSize(1024);
134
135 admin1.addPeer("1", utility2.getClusterKey());
136
137
138 putAndWait(row, famName, htable1, htable2);
139 deleteAndWait(row, htable1, htable2);
140
141 checkRow(row,0,htable3);
142
143 putAndWait(row2, famName, htable1, htable2);
144
145
146 new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0).getServerName().toString());
147
148 putAndWait(row3, famName, htable1, htable2);
149
150 admin1.addPeer("2", utility3.getClusterKey());
151
152
153 putAndWait(row1, famName, htable1, htable2, htable3);
154
155 deleteAndWait(row1, htable1, htable2, htable3);
156
157
158
159 checkRow(row2,0,htable3);
160
161
162
163 checkRow(row3,1,htable3);
164
165 Put p = new Put(row);
166 p.add(famName, row, row);
167 htable1.put(p);
168
169 new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0)
170 .getServerName().toString());
171
172
173
174 deleteAndWait(row2, htable1, htable2, htable3);
175
176
177 checkRow(row, 1, htable2);
178
179
180 checkWithWait(row, 1, htable3);
181
182
183 deleteAndWait(row, htable1, htable2, htable3);
184 deleteAndWait(row3, htable1, htable2, htable3);
185
186 utility3.shutdownMiniCluster();
187 utility2.shutdownMiniCluster();
188 utility1.shutdownMiniCluster();
189 }
190
191 private void checkWithWait(byte[] row, int count, HTable table) throws Exception {
192 Get get = new Get(row);
193 for (int i = 0; i < NB_RETRIES; i++) {
194 if (i == NB_RETRIES - 1) {
195 fail("Waited too much time while getting the row.");
196 }
197 boolean rowReplicated = false;
198 Result res = table.get(get);
199 if (res.size() >= 1) {
200 LOG.info("Row is replicated");
201 rowReplicated = true;
202 assertEquals(count, res.size());
203 break;
204 }
205 if (rowReplicated) {
206 break;
207 } else {
208 Thread.sleep(SLEEP_TIME);
209 }
210 }
211 }
212
213 private void checkRow(byte[] row, int count, HTable... tables) throws IOException {
214 Get get = new Get(row);
215 for (HTable table : tables) {
216 Result res = table.get(get);
217 assertEquals(count, res.size());
218 }
219 }
220
221 private void deleteAndWait(byte[] row, HTable source, HTable... targets)
222 throws Exception {
223 Delete del = new Delete(row);
224 source.delete(del);
225
226 Get get = new Get(row);
227 for (int i = 0; i < NB_RETRIES; i++) {
228 if (i==NB_RETRIES-1) {
229 fail("Waited too much time for del replication");
230 }
231 boolean removedFromAll = true;
232 for (HTable target : targets) {
233 Result res = target.get(get);
234 if (res.size() >= 1) {
235 LOG.info("Row not deleted");
236 removedFromAll = false;
237 break;
238 }
239 }
240 if (removedFromAll) {
241 break;
242 } else {
243 Thread.sleep(SLEEP_TIME);
244 }
245 }
246 }
247
248 private void putAndWait(byte[] row, byte[] fam, HTable source, HTable... targets)
249 throws Exception {
250 Put put = new Put(row);
251 put.add(fam, row, row);
252 source.put(put);
253
254 Get get = new Get(row);
255 for (int i = 0; i < NB_RETRIES; i++) {
256 if (i==NB_RETRIES-1) {
257 fail("Waited too much time for put replication");
258 }
259 boolean replicatedToAll = true;
260 for (HTable target : targets) {
261 Result res = target.get(get);
262 if (res.size() == 0) {
263 LOG.info("Row not available");
264 replicatedToAll = false;
265 break;
266 } else {
267 assertArrayEquals(res.value(), row);
268 }
269 }
270 if (replicatedToAll) {
271 break;
272 } else {
273 Thread.sleep(SLEEP_TIME);
274 }
275 }
276 }
277
278 }
279