1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.util.Random;
29 import java.util.concurrent.Semaphore;
30
31 import junit.framework.Assert;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.*;
35 import org.apache.hadoop.hbase.master.TestActiveMasterManager.NodeDeletionListener;
36 import org.apache.hadoop.hbase.testclassification.MediumTests;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.util.Threads;
39 import org.apache.zookeeper.CreateMode;
40 import org.apache.zookeeper.WatchedEvent;
41 import org.apache.zookeeper.Watcher;
42 import org.apache.zookeeper.ZooDefs.Ids;
43 import org.apache.zookeeper.ZooKeeper;
44 import org.junit.AfterClass;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48
49 @Category(MediumTests.class)
50 public class TestZooKeeperNodeTracker {
51 private static final Log LOG = LogFactory.getLog(TestZooKeeperNodeTracker.class);
52 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
53
54 private final static Random rand = new Random();
55
56 @BeforeClass
57 public static void setUpBeforeClass() throws Exception {
58 TEST_UTIL.startMiniZKCluster();
59 }
60
61 @AfterClass
62 public static void tearDownAfterClass() throws Exception {
63 TEST_UTIL.shutdownMiniZKCluster();
64 }
65
66
67
68
69
70
71 @Test public void testInterruptible() throws IOException, InterruptedException {
72 Abortable abortable = new StubAbortable();
73 ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
74 "testInterruptible", abortable);
75 final TestTracker tracker = new TestTracker(zk, "/xyz", abortable);
76 tracker.start();
77 Thread t = new Thread() {
78 @Override
79 public void run() {
80 try {
81 tracker.blockUntilAvailable();
82 } catch (InterruptedException e) {
83 throw new RuntimeException("Interrupted", e);
84 }
85 }
86 };
87 t.start();
88 while (!t.isAlive()) Threads.sleep(1);
89 tracker.stop();
90 t.join();
91
92 }
93
94 @Test
95 public void testNodeTracker() throws Exception {
96 Abortable abortable = new StubAbortable();
97 ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
98 "testNodeTracker", abortable);
99 ZKUtil.createAndFailSilent(zk, zk.baseZNode);
100
101 final String node =
102 ZKUtil.joinZNode(zk.baseZNode, new Long(rand.nextLong()).toString());
103
104 final byte [] dataOne = Bytes.toBytes("dataOne");
105 final byte [] dataTwo = Bytes.toBytes("dataTwo");
106
107
108 TestTracker localTracker = new TestTracker(zk, node, abortable);
109 localTracker.start();
110 zk.registerListener(localTracker);
111
112
113 assertNull(localTracker.getData(false));
114
115
116 WaitToGetDataThread thread = new WaitToGetDataThread(zk, node);
117 thread.start();
118
119
120 assertFalse(thread.hasData);
121
122
123 TestTracker secondTracker = new TestTracker(zk, node, null);
124 secondTracker.start();
125 zk.registerListener(secondTracker);
126
127
128 TestingZKListener zkListener = new TestingZKListener(zk, node);
129 zk.registerListener(zkListener);
130 assertEquals(0, zkListener.createdLock.availablePermits());
131
132
133
134 final ZooKeeper zkconn = new ZooKeeper(
135 ZKConfig.getZKQuorumServersString(TEST_UTIL.getConfiguration()), 60000,
136 new StubWatcher());
137
138
139 zkconn.create(node, dataOne, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
140
141
142 zkListener.waitForCreation();
143 thread.join();
144
145
146 assertNotNull(localTracker.getData(false));
147 assertNotNull(localTracker.blockUntilAvailable());
148 assertTrue(Bytes.equals(localTracker.getData(false), dataOne));
149 assertTrue(thread.hasData);
150 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne));
151 LOG.info("Successfully got data one");
152
153
154 assertNotNull(secondTracker.getData(false));
155 assertNotNull(secondTracker.blockUntilAvailable());
156 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne));
157 LOG.info("Successfully got data one with the second tracker");
158
159
160 zkconn.delete(node, -1);
161 zkListener.waitForDeletion();
162
163
164 TestTracker threadTracker = thread.tracker;
165 thread = new WaitToGetDataThread(zk, node, threadTracker);
166 thread.start();
167
168
169 assertFalse(thread.hasData);
170 assertNull(secondTracker.getData(false));
171 assertNull(localTracker.getData(false));
172 LOG.info("Successfully made unavailable");
173
174
175 zkconn.create(node, dataTwo, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
176
177
178 zkListener.waitForCreation();
179 thread.join();
180
181
182 assertNotNull(localTracker.getData(false));
183 assertNotNull(localTracker.blockUntilAvailable());
184 assertTrue(Bytes.equals(localTracker.getData(false), dataTwo));
185 assertNotNull(secondTracker.getData(false));
186 assertNotNull(secondTracker.blockUntilAvailable());
187 assertTrue(Bytes.equals(secondTracker.getData(false), dataTwo));
188 assertTrue(thread.hasData);
189 assertTrue(Bytes.equals(thread.tracker.getData(false), dataTwo));
190 LOG.info("Successfully got data two on all trackers and threads");
191
192
193 zkconn.setData(node, dataOne, -1);
194
195
196 zkListener.waitForDataChange();
197
198
199 assertNotNull(localTracker.getData(false));
200 assertNotNull(localTracker.blockUntilAvailable());
201 assertTrue(Bytes.equals(localTracker.getData(false), dataOne));
202 assertNotNull(secondTracker.getData(false));
203 assertNotNull(secondTracker.blockUntilAvailable());
204 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne));
205 assertTrue(thread.hasData);
206 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne));
207 LOG.info("Successfully got data one following a data change on all trackers and threads");
208 }
209
210 public static class WaitToGetDataThread extends Thread {
211
212 TestTracker tracker;
213 boolean hasData;
214
215 public WaitToGetDataThread(ZooKeeperWatcher zk, String node) {
216 tracker = new TestTracker(zk, node, null);
217 tracker.start();
218 zk.registerListener(tracker);
219 hasData = false;
220 }
221
222 public WaitToGetDataThread(ZooKeeperWatcher zk, String node,
223 TestTracker tracker) {
224 this.tracker = tracker;
225 hasData = false;
226 }
227
228 @Override
229 public void run() {
230 LOG.info("Waiting for data to be available in WaitToGetDataThread");
231 try {
232 tracker.blockUntilAvailable();
233 } catch (InterruptedException e) {
234 e.printStackTrace();
235 }
236 LOG.info("Data now available in tracker from WaitToGetDataThread");
237 hasData = true;
238 }
239 }
240
241 public static class TestTracker extends ZooKeeperNodeTracker {
242 public TestTracker(ZooKeeperWatcher watcher, String node,
243 Abortable abortable) {
244 super(watcher, node, abortable);
245 }
246 }
247
248 public static class TestingZKListener extends ZooKeeperListener {
249 private static final Log LOG = LogFactory.getLog(NodeDeletionListener.class);
250
251 private Semaphore deletedLock;
252 private Semaphore createdLock;
253 private Semaphore changedLock;
254 private String node;
255
256 public TestingZKListener(ZooKeeperWatcher watcher, String node) {
257 super(watcher);
258 deletedLock = new Semaphore(0);
259 createdLock = new Semaphore(0);
260 changedLock = new Semaphore(0);
261 this.node = node;
262 }
263
264 @Override
265 public void nodeDeleted(String path) {
266 if(path.equals(node)) {
267 LOG.debug("nodeDeleted(" + path + ")");
268 deletedLock.release();
269 }
270 }
271
272 @Override
273 public void nodeCreated(String path) {
274 if(path.equals(node)) {
275 LOG.debug("nodeCreated(" + path + ")");
276 createdLock.release();
277 }
278 }
279
280 @Override
281 public void nodeDataChanged(String path) {
282 if(path.equals(node)) {
283 LOG.debug("nodeDataChanged(" + path + ")");
284 changedLock.release();
285 }
286 }
287
288 public void waitForDeletion() throws InterruptedException {
289 deletedLock.acquire();
290 }
291
292 public void waitForCreation() throws InterruptedException {
293 createdLock.acquire();
294 }
295
296 public void waitForDataChange() throws InterruptedException {
297 changedLock.acquire();
298 }
299 }
300
301 public static class StubAbortable implements Abortable {
302 @Override
303 public void abort(final String msg, final Throwable t) {}
304
305 @Override
306 public boolean isAborted() {
307 return false;
308 }
309
310 }
311
312 public static class StubWatcher implements Watcher {
313 @Override
314 public void process(WatchedEvent event) {}
315 }
316
317 @Test
318 public void testCleanZNode() throws Exception {
319 ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
320 "testNodeTracker", new TestZooKeeperNodeTracker.StubAbortable());
321
322 final ServerName sn = ServerName.valueOf("127.0.0.1:52", 45L);
323
324 ZKUtil.createAndFailSilent(zkw,
325 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT,
326 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
327
328 final String nodeName = zkw.getMasterAddressZNode();
329
330
331 ZKUtil.createAndFailSilent(zkw, nodeName);
332 MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
333 Assert.assertFalse(ZKUtil.getData(zkw, nodeName) == null);
334
335
336 ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn));
337 MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString());
338 Assert.assertFalse(ZKUtil.getData(zkw, nodeName) == null);
339
340
341 ZKUtil.setData(zkw, nodeName,MasterAddressTracker.toByteArray(sn));
342 MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
343 Assert.assertTrue( ZKUtil.getData(zkw, nodeName)== null );
344
345
346 MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
347 }
348
349 }
350