1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.UUID;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.testclassification.MediumTests;
31 import org.apache.hadoop.hbase.Waiter;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
35 import org.apache.hadoop.hbase.regionserver.wal.HLog;
36 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
39 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40 import org.junit.AfterClass;
41 import org.junit.Assert;
42 import org.junit.Before;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46
47
48
49
50 @Category(MediumTests.class)
51 public class TestReplicationEndpoint extends TestReplicationBase {
52
53 static int numRegionServers;
54
55 @BeforeClass
56 public static void setUpBeforeClass() throws Exception {
57 TestReplicationBase.setUpBeforeClass();
58 utility2.shutdownMiniCluster();
59 admin.removePeer("2");
60 numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
61 }
62
63 @AfterClass
64 public static void tearDownAfterClass() throws Exception {
65 TestReplicationBase.tearDownAfterClass();
66
67 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
68 }
69
70 @Before
71 public void setup() throws FailedLogCloseException, IOException {
72 ReplicationEndpointForTest.contructedCount.set(0);
73 ReplicationEndpointForTest.startedCount.set(0);
74 ReplicationEndpointForTest.replicateCount.set(0);
75 ReplicationEndpointForTest.lastEntries = null;
76 for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
77 utility1.getHBaseAdmin().rollHLogWriter(rs.getRegionServer().getServerName().toString());
78 }
79 }
80
81 @Test
82 public void testCustomReplicationEndpoint() throws Exception {
83
84 admin.addPeer("testCustomReplicationEndpoint",
85 new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
86 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
87
88
89 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
90 @Override
91 public boolean evaluate() throws Exception {
92 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
93 }
94 });
95
96 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
97 @Override
98 public boolean evaluate() throws Exception {
99 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
100 }
101 });
102
103 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
104
105
106 doPut(Bytes.toBytes("row42"));
107
108 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
109 @Override
110 public boolean evaluate() throws Exception {
111 return ReplicationEndpointForTest.replicateCount.get() >= 1;
112 }
113 });
114
115 doAssert(Bytes.toBytes("row42"));
116
117 admin.removePeer("testCustomReplicationEndpoint");
118 }
119
120 @Test
121 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
122 admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate",
123 new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
124 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
125
126 doPut(row);
127
128 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
129 @Override
130 public boolean evaluate() throws Exception {
131 return ReplicationEndpointReturningFalse.replicated.get();
132 }
133 });
134 if (ReplicationEndpointReturningFalse.ex.get() != null) {
135 throw ReplicationEndpointReturningFalse.ex.get();
136 }
137
138 admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
139 }
140
141 @Test
142 public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
143 admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
144 new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
145 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
146
147 doPut(Bytes.toBytes("row1"));
148 doPut(row);
149 doPut(Bytes.toBytes("row2"));
150
151 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
152 @Override
153 public boolean evaluate() throws Exception {
154 return ReplicationEndpointForTest.replicateCount.get() >= 1;
155 }
156 });
157
158 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
159 admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
160 }
161
162
163 private void doPut(byte[] row) throws IOException {
164 Put put = new Put(row);
165 put.add(famName, row, row);
166 htable1 = new HTable(conf1, tableName);
167 htable1.put(put);
168 htable1.close();
169 }
170
171 private static void doAssert(byte[] row) throws Exception {
172 if (ReplicationEndpointForTest.lastEntries == null) {
173 return;
174 }
175 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
176 List<KeyValue> kvs = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getKeyValues();
177 Assert.assertEquals(1, kvs.size());
178 Assert.assertTrue(Bytes.equals(kvs.get(0).getRowArray(), kvs.get(0).getRowOffset(),
179 kvs.get(0).getRowLength(), row, 0, row.length));
180 }
181
182 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
183 static UUID uuid = UUID.randomUUID();
184 static AtomicInteger contructedCount = new AtomicInteger();
185 static AtomicInteger startedCount = new AtomicInteger();
186 static AtomicInteger stoppedCount = new AtomicInteger();
187 static AtomicInteger replicateCount = new AtomicInteger();
188 static volatile List<HLog.Entry> lastEntries = null;
189
190 public ReplicationEndpointForTest() {
191 contructedCount.incrementAndGet();
192 }
193
194 @Override
195 public UUID getPeerUUID() {
196 return uuid;
197 }
198
199 @Override
200 public boolean replicate(ReplicateContext replicateContext) {
201 replicateCount.incrementAndGet();
202 lastEntries = replicateContext.entries;
203 return true;
204 }
205
206 @Override
207 protected void doStart() {
208 startedCount.incrementAndGet();
209 notifyStarted();
210 }
211
212 @Override
213 protected void doStop() {
214 stoppedCount.incrementAndGet();
215 notifyStopped();
216 }
217 }
218
219 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
220 static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
221 static AtomicBoolean replicated = new AtomicBoolean(false);
222 @Override
223 public boolean replicate(ReplicateContext replicateContext) {
224 try {
225
226 doAssert(row);
227 } catch (Exception e) {
228 ex.set(e);
229 }
230
231 super.replicate(replicateContext);
232
233 replicated.set(replicateCount.get() > 10);
234 return replicated.get();
235 }
236 }
237
238
239 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
240 static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
241
242 @Override
243 public boolean replicate(ReplicateContext replicateContext) {
244 try {
245 super.replicate(replicateContext);
246 doAssert(row);
247 } catch (Exception e) {
248 ex.set(e);
249 }
250 return true;
251 }
252
253 @Override
254 public WALEntryFilter getWALEntryfilter() {
255 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
256 @Override
257 public Entry filter(Entry entry) {
258 ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
259 int size = kvs.size();
260 for (int i = size-1; i >= 0; i--) {
261 KeyValue kv = kvs.get(i);
262 if (!Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
263 row, 0, row.length)) {
264 kvs.remove(i);
265 }
266 }
267 return entry;
268 }
269 });
270 }
271 }
272 }