View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication;
19  
20  import static org.junit.Assert.assertArrayEquals;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.fail;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.List;
27  import java.util.Random;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.HBaseConfiguration;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HColumnDescriptor;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.testclassification.LargeTests;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.client.Delete;
41  import org.apache.hadoop.hbase.client.Durability;
42  import org.apache.hadoop.hbase.client.Get;
43  import org.apache.hadoop.hbase.client.HBaseAdmin;
44  import org.apache.hadoop.hbase.client.HTable;
45  import org.apache.hadoop.hbase.client.Put;
46  import org.apache.hadoop.hbase.client.Result;
47  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
48  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
49  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
50  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
51  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
52  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
55  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
56  import org.junit.After;
57  import org.junit.Before;
58  import org.junit.Test;
59  import org.junit.experimental.categories.Category;
60  
61  @Category(LargeTests.class)
62  public class TestMasterReplication {
63  
64    private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
65  
66    private Configuration baseConfiguration;
67  
68    private HBaseTestingUtility[] utilities;
69    private Configuration[] configurations;
70    private MiniZooKeeperCluster miniZK;
71  
72    private static final long SLEEP_TIME = 500;
73    private static final int NB_RETRIES = 10;
74  
75    private static final byte[] tableName = Bytes.toBytes("test");
76    private static final byte[] famName = Bytes.toBytes("f");
77    private static final byte[] row = Bytes.toBytes("row");
78    private static final byte[] row1 = Bytes.toBytes("row1");
79    private static final byte[] row2 = Bytes.toBytes("row2");
80    private static final byte[] row3 = Bytes.toBytes("row3");
81    private static final byte[] row4 = Bytes.toBytes("row4");
82    private static final byte[] noRepfamName = Bytes.toBytes("norep");
83  
84    private static final byte[] count = Bytes.toBytes("count");
85    private static final byte[] put = Bytes.toBytes("put");
86    private static final byte[] delete = Bytes.toBytes("delete");
87  
88    private HTableDescriptor table;
89  
90    @Before
91    public void setUp() throws Exception {
92      baseConfiguration = HBaseConfiguration.create();
93      // smaller block size and capacity to trigger more operations
94      // and test them
95      baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
96      baseConfiguration.setInt("replication.source.size.capacity", 1024);
97      baseConfiguration.setLong("replication.source.sleepforretries", 100);
98      baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
99      baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
100     baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
101         HConstants.REPLICATION_ENABLE_DEFAULT);
102     baseConfiguration.setBoolean("dfs.support.append", true);
103     baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
104     baseConfiguration.setStrings(
105         CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
106         CoprocessorCounter.class.getName());
107 
108     table = new HTableDescriptor(TableName.valueOf(tableName));
109     HColumnDescriptor fam = new HColumnDescriptor(famName);
110     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
111     table.addFamily(fam);
112     fam = new HColumnDescriptor(noRepfamName);
113     table.addFamily(fam);
114   }
115 
116   /**
117    * It tests the replication scenario involving 0 -> 1 -> 0. It does it by
118    * adding and deleting a row to a table in each cluster, checking if it's
119    * replicated. It also tests that the puts and deletes are not replicated back
120    * to the originating cluster.
121    */
122   @Test(timeout = 300000)
123   public void testCyclicReplication1() throws Exception {
124     LOG.info("testSimplePutDelete");
125     int numClusters = 2;
126     HTable[] htables = null;
127     try {
128       startMiniClusters(numClusters);
129       createTableOnClusters(table);
130 
131       htables = getHTablesOnClusters(tableName);
132 
133       // Test the replication scenarios of 0 -> 1 -> 0
134       addPeer("1", 0, 1);
135       addPeer("1", 1, 0);
136 
137       int[] expectedCounts = new int[] { 2, 2 };
138 
139       // add rows to both clusters,
140       // make sure they are both replication
141       putAndWait(row, famName, htables[0], htables[1]);
142       putAndWait(row1, famName, htables[1], htables[0]);
143       validateCounts(htables, put, expectedCounts);
144 
145       deleteAndWait(row, htables[0], htables[1]);
146       deleteAndWait(row1, htables[1], htables[0]);
147       validateCounts(htables, delete, expectedCounts);
148     } finally {
149       close(htables);
150       shutDownMiniClusters();
151     }
152   }
153 
154   /**
155    * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
156    * deleting rows to a table in each clusters and ensuring that the each of
157    * these clusters get the appropriate mutations. It also tests the grouping
158    * scenario where a cluster needs to replicate the edits originating from
159    * itself and also the edits that it received using replication from a
160    * different cluster. The scenario is explained in HBASE-9158
161    */
162   @Test(timeout = 300000)
163   public void testCyclicReplication2() throws Exception {
164     LOG.info("testCyclicReplication1");
165     int numClusters = 3;
166     HTable[] htables = null;
167     try {
168       startMiniClusters(numClusters);
169       createTableOnClusters(table);
170 
171       // Test the replication scenario of 0 -> 1 -> 2 -> 0
172       addPeer("1", 0, 1);
173       addPeer("1", 1, 2);
174       addPeer("1", 2, 0);
175 
176       htables = getHTablesOnClusters(tableName);
177 
178       // put "row" and wait 'til it got around
179       putAndWait(row, famName, htables[0], htables[2]);
180       putAndWait(row1, famName, htables[1], htables[0]);
181       putAndWait(row2, famName, htables[2], htables[1]);
182 
183       deleteAndWait(row, htables[0], htables[2]);
184       deleteAndWait(row1, htables[1], htables[0]);
185       deleteAndWait(row2, htables[2], htables[1]);
186 
187       int[] expectedCounts = new int[] { 3, 3, 3 };
188       validateCounts(htables, put, expectedCounts);
189       validateCounts(htables, delete, expectedCounts);
190 
191       // Test HBASE-9158
192       disablePeer("1", 2);
193       // we now have an edit that was replicated into cluster originating from
194       // cluster 0
195       putAndWait(row3, famName, htables[0], htables[1]);
196       // now add a local edit to cluster 1
197       htables[1].put(new Put(row4).add(famName, row4, row4));
198       // re-enable replication from cluster 2 to cluster 0
199       enablePeer("1", 2);
200       // without HBASE-9158 the edit for row4 would have been marked with
201       // cluster 0's id
202       // and hence not replicated to cluster 0
203       wait(row4, htables[0], true);
204     } finally {
205       close(htables);
206       shutDownMiniClusters();
207     }
208   }
209 
210   /**
211    * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
212    */
213   @Test(timeout = 300000)
214   public void testCyclicReplication3() throws Exception {
215     LOG.info("testCyclicReplication2");
216     int numClusters = 3;
217     HTable[] htables = null;
218     try {
219       startMiniClusters(numClusters);
220       createTableOnClusters(table);
221 
222       // Test the replication scenario of 0 -> 1 -> 2 -> 1
223       addPeer("1", 0, 1);
224       addPeer("1", 1, 2);
225       addPeer("1", 2, 1);
226 
227       htables = getHTablesOnClusters(tableName);
228 
229       // put "row" and wait 'til it got around
230       putAndWait(row, famName, htables[0], htables[2]);
231       putAndWait(row1, famName, htables[1], htables[2]);
232       putAndWait(row2, famName, htables[2], htables[1]);
233 
234       deleteAndWait(row, htables[0], htables[2]);
235       deleteAndWait(row1, htables[1], htables[2]);
236       deleteAndWait(row2, htables[2], htables[1]);
237 
238       int[] expectedCounts = new int[] { 1, 3, 3 };
239       validateCounts(htables, put, expectedCounts);
240       validateCounts(htables, delete, expectedCounts);
241     } finally {
242       close(htables);
243       shutDownMiniClusters();
244     }
245   }
246 
247   @After
248   public void tearDown() throws IOException {
249     configurations = null;
250     utilities = null;
251   }
252 
253   @SuppressWarnings("resource")
254   private void startMiniClusters(int numClusters) throws Exception {
255     Random random = new Random();
256     utilities = new HBaseTestingUtility[numClusters];
257     configurations = new Configuration[numClusters];
258     for (int i = 0; i < numClusters; i++) {
259       Configuration conf = new Configuration(baseConfiguration);
260       conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
261       HBaseTestingUtility utility = new HBaseTestingUtility(conf);
262       if (i == 0) {
263         utility.startMiniZKCluster();
264         miniZK = utility.getZkCluster();
265       } else {
266         utility.setZkCluster(miniZK);
267       }
268       utility.startMiniCluster();
269       utilities[i] = utility;
270       configurations[i] = conf;
271       new ZooKeeperWatcher(conf, "cluster" + i, null, true);
272     }
273   }
274 
275   private void shutDownMiniClusters() throws Exception {
276     int numClusters = utilities.length;
277     for (int i = numClusters - 1; i >= 0; i--) {
278       if (utilities[i] != null) {
279         utilities[i].shutdownMiniCluster();
280       }
281     }
282     miniZK.shutdown();
283   }
284 
285   private void createTableOnClusters(HTableDescriptor table) throws Exception {
286     int numClusters = configurations.length;
287     for (int i = 0; i < numClusters; i++) {
288       HBaseAdmin hbaseAdmin = null;
289       try {
290         hbaseAdmin = new HBaseAdmin(configurations[i]);
291         hbaseAdmin.createTable(table);
292       } finally {
293         close(hbaseAdmin);
294       }
295     }
296   }
297 
298   private void addPeer(String id, int masterClusterNumber,
299       int slaveClusterNumber) throws Exception {
300     ReplicationAdmin replicationAdmin = null;
301     try {
302       replicationAdmin = new ReplicationAdmin(
303           configurations[masterClusterNumber]);
304       replicationAdmin.addPeer(id,
305           utilities[slaveClusterNumber].getClusterKey());
306     } finally {
307       close(replicationAdmin);
308     }
309   }
310 
311   private void disablePeer(String id, int masterClusterNumber) throws Exception {
312     ReplicationAdmin replicationAdmin = null;
313     try {
314       replicationAdmin = new ReplicationAdmin(
315           configurations[masterClusterNumber]);
316       replicationAdmin.disablePeer(id);
317     } finally {
318       close(replicationAdmin);
319     }
320   }
321 
322   private void enablePeer(String id, int masterClusterNumber) throws Exception {
323     ReplicationAdmin replicationAdmin = null;
324     try {
325       replicationAdmin = new ReplicationAdmin(
326           configurations[masterClusterNumber]);
327       replicationAdmin.enablePeer(id);
328     } finally {
329       close(replicationAdmin);
330     }
331   }
332 
333   private void close(Closeable... closeables) {
334     try {
335       if (closeables != null) {
336         for (Closeable closeable : closeables) {
337           closeable.close();
338         }
339       }
340     } catch (Exception e) {
341       LOG.warn("Exception occured while closing the object:", e);
342     }
343   }
344 
345   @SuppressWarnings("resource")
346   private HTable[] getHTablesOnClusters(byte[] tableName) throws Exception {
347     int numClusters = utilities.length;
348     HTable[] htables = new HTable[numClusters];
349     for (int i = 0; i < numClusters; i++) {
350       HTable htable = new HTable(configurations[i], tableName);
351       htable.setWriteBufferSize(1024);
352       htables[i] = htable;
353     }
354     return htables;
355   }
356 
357   private void validateCounts(HTable[] htables, byte[] type,
358       int[] expectedCounts) throws IOException {
359     for (int i = 0; i < htables.length; i++) {
360       assertEquals(Bytes.toString(type) + " were replicated back ",
361           expectedCounts[i], getCount(htables[i], type));
362     }
363   }
364 
365   private int getCount(HTable t, byte[] type) throws IOException {
366     Get test = new Get(row);
367     test.setAttribute("count", new byte[] {});
368     Result res = t.get(test);
369     return Bytes.toInt(res.getValue(count, type));
370   }
371 
372   private void deleteAndWait(byte[] row, HTable source, HTable target)
373       throws Exception {
374     Delete del = new Delete(row);
375     source.delete(del);
376     wait(row, target, true);
377   }
378 
379   private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target)
380       throws Exception {
381     Put put = new Put(row);
382     put.add(fam, row, row);
383     source.put(put);
384     wait(row, target, false);
385   }
386 
387   private void wait(byte[] row, HTable target, boolean isDeleted)
388       throws Exception {
389     Get get = new Get(row);
390     for (int i = 0; i < NB_RETRIES; i++) {
391       if (i == NB_RETRIES - 1) {
392         fail("Waited too much time for replication. Row:" + Bytes.toString(row)
393             + ". IsDeleteReplication:" + isDeleted);
394       }
395       Result res = target.get(get);
396       boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0;
397       if (sleep) {
398         LOG.info("Waiting for more time for replication. Row:"
399             + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
400         Thread.sleep(SLEEP_TIME);
401       } else {
402         if (!isDeleted) {
403           assertArrayEquals(res.value(), row);
404         }
405         LOG.info("Obtained row:"
406             + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
407         break;
408       }
409     }
410   }
411 
412   /**
413    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
414    * timestamp there is otherwise no way to count them.
415    */
416   public static class CoprocessorCounter extends BaseRegionObserver {
417     private int nCount = 0;
418     private int nDelete = 0;
419 
420     @Override
421     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
422         final WALEdit edit, final Durability durability) throws IOException {
423       nCount++;
424     }
425 
426     @Override
427     public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
428         final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
429       nDelete++;
430     }
431 
432     @Override
433     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
434         final Get get, final List<Cell> result) throws IOException {
435       if (get.getAttribute("count") != null) {
436         result.clear();
437         // order is important!
438         result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
439         result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
440         c.bypass();
441       }
442     }
443   }
444 
445 }