View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.UUID;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.testclassification.MediumTests;
31  import org.apache.hadoop.hbase.Waiter;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
35  import org.apache.hadoop.hbase.regionserver.wal.HLog;
36  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
39  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40  import org.junit.AfterClass;
41  import org.junit.Assert;
42  import org.junit.Before;
43  import org.junit.BeforeClass;
44  import org.junit.Test;
45  import org.junit.experimental.categories.Category;
46  
47  /**
48   * Tests ReplicationSource and ReplicationEndpoint interactions
49   */
50  @Category(MediumTests.class)
51  public class TestReplicationEndpoint extends TestReplicationBase {
52  
53    static int numRegionServers;
54  
55    @BeforeClass
56    public static void setUpBeforeClass() throws Exception {
57      TestReplicationBase.setUpBeforeClass();
58      utility2.shutdownMiniCluster(); // we don't need the second cluster
59      admin.removePeer("2");
60      numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
61    }
62  
63    @AfterClass
64    public static void tearDownAfterClass() throws Exception {
65      TestReplicationBase.tearDownAfterClass();
66      // check stop is called
67      Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
68    }
69  
70    @Before
71    public void setup() throws FailedLogCloseException, IOException {
72      ReplicationEndpointForTest.contructedCount.set(0);
73      ReplicationEndpointForTest.startedCount.set(0);
74      ReplicationEndpointForTest.replicateCount.set(0);
75      ReplicationEndpointForTest.lastEntries = null;
76      for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
77        utility1.getHBaseAdmin().rollHLogWriter(rs.getRegionServer().getServerName().toString());
78      }
79    }
80  
81    @Test
82    public void testCustomReplicationEndpoint() throws Exception {
83      // test installing a custom replication endpoint other than the default one.
84      admin.addPeer("testCustomReplicationEndpoint",
85        new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
86          .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
87  
88      // check whether the class has been constructed and started
89      Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
90        @Override
91        public boolean evaluate() throws Exception {
92          return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
93        }
94      });
95  
96      Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
97        @Override
98        public boolean evaluate() throws Exception {
99          return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
100       }
101     });
102 
103     Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
104 
105     // now replicate some data.
106     doPut(Bytes.toBytes("row42"));
107 
108     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
109       @Override
110       public boolean evaluate() throws Exception {
111         return ReplicationEndpointForTest.replicateCount.get() >= 1;
112       }
113     });
114 
115     doAssert(Bytes.toBytes("row42"));
116 
117     admin.removePeer("testCustomReplicationEndpoint");
118   }
119 
120   @Test
121   public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
122     admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate",
123       new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
124         .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
125     // now replicate some data.
126     doPut(row);
127 
128     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
129       @Override
130       public boolean evaluate() throws Exception {
131         return ReplicationEndpointReturningFalse.replicated.get();
132       }
133     });
134     if (ReplicationEndpointReturningFalse.ex.get() != null) {
135       throw ReplicationEndpointReturningFalse.ex.get();
136     }
137 
138     admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
139   }
140 
141   @Test
142   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
143     admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
144       new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
145         .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
146     // now replicate some data.
147     doPut(Bytes.toBytes("row1"));
148     doPut(row);
149     doPut(Bytes.toBytes("row2"));
150 
151     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
152       @Override
153       public boolean evaluate() throws Exception {
154         return ReplicationEndpointForTest.replicateCount.get() >= 1;
155       }
156     });
157 
158     Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
159     admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
160   }
161 
162 
163   private void doPut(byte[] row) throws IOException {
164     Put put = new Put(row);
165     put.add(famName, row, row);
166     htable1 = new HTable(conf1, tableName);
167     htable1.put(put);
168     htable1.close();
169   }
170 
171   private static void doAssert(byte[] row) throws Exception {
172     if (ReplicationEndpointForTest.lastEntries == null) {
173       return; // first call
174     }
175     Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
176     List<KeyValue> kvs = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getKeyValues();
177     Assert.assertEquals(1, kvs.size());
178     Assert.assertTrue(Bytes.equals(kvs.get(0).getRowArray(), kvs.get(0).getRowOffset(),
179       kvs.get(0).getRowLength(), row, 0, row.length));
180   }
181 
182   public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
183     static UUID uuid = UUID.randomUUID();
184     static AtomicInteger contructedCount = new AtomicInteger();
185     static AtomicInteger startedCount = new AtomicInteger();
186     static AtomicInteger stoppedCount = new AtomicInteger();
187     static AtomicInteger replicateCount = new AtomicInteger();
188     static volatile List<HLog.Entry> lastEntries = null;
189 
190     public ReplicationEndpointForTest() {
191       contructedCount.incrementAndGet();
192     }
193 
194     @Override
195     public UUID getPeerUUID() {
196       return uuid;
197     }
198 
199     @Override
200     public boolean replicate(ReplicateContext replicateContext) {
201       replicateCount.incrementAndGet();
202       lastEntries = replicateContext.entries;
203       return true;
204     }
205 
206     @Override
207     protected void doStart() {
208       startedCount.incrementAndGet();
209       notifyStarted();
210     }
211 
212     @Override
213     protected void doStop() {
214       stoppedCount.incrementAndGet();
215       notifyStopped();
216     }
217   }
218 
219   public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
220     static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
221     static AtomicBoolean replicated = new AtomicBoolean(false);
222     @Override
223     public boolean replicate(ReplicateContext replicateContext) {
224       try {
225         // check row
226         doAssert(row);
227       } catch (Exception e) {
228         ex.set(e);
229       }
230 
231       super.replicate(replicateContext);
232 
233       replicated.set(replicateCount.get() > 10); // first 10 times, we return false
234       return replicated.get();
235     }
236   }
237 
238   // return a WALEntry filter which only accepts "row", but not other rows
239   public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
240     static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
241 
242     @Override
243     public boolean replicate(ReplicateContext replicateContext) {
244       try {
245         super.replicate(replicateContext);
246         doAssert(row);
247       } catch (Exception e) {
248         ex.set(e);
249       }
250       return true;
251     }
252 
253     @Override
254     public WALEntryFilter getWALEntryfilter() {
255       return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
256         @Override
257         public Entry filter(Entry entry) {
258           ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
259           int size = kvs.size();
260           for (int i = size-1; i >= 0; i--) {
261             KeyValue kv = kvs.get(i);
262             if (!Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
263               row, 0, row.length)) {
264               kvs.remove(i);
265             }
266           }
267           return entry;
268         }
269       });
270     }
271   }
272 }