View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.mockito.Mockito.mock;
25  import static org.mockito.Mockito.when;
26  
27  import java.util.List;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.testclassification.MediumTests;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.SplitLogCounters;
39  import org.apache.hadoop.hbase.SplitLogTask;
40  import org.apache.hadoop.hbase.Waiter;
41  import org.apache.hadoop.hbase.executor.ExecutorService;
42  import org.apache.hadoop.hbase.executor.ExecutorType;
43  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
44  import org.apache.hadoop.hbase.util.CancelableProgressable;
45  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
46  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
47  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48  import org.apache.log4j.Level;
49  import org.apache.log4j.Logger;
50  import org.apache.zookeeper.CreateMode;
51  import org.apache.zookeeper.ZooDefs.Ids;
52  import org.junit.After;
53  import org.junit.Before;
54  import org.junit.Test;
55  import org.junit.experimental.categories.Category;
56  
57  @Category(MediumTests.class)
58  public class TestSplitLogWorker {
59    private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
60    private static final int WAIT_TIME = 15000;
61    private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
62    static {
63      Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
64    }
65    private final static HBaseTestingUtility TEST_UTIL =
66      new HBaseTestingUtility();
67    private ZooKeeperWatcher zkw;
68    private SplitLogWorker slw;
69    private ExecutorService executorService;
70    private RecoveryMode mode;
71  
72    private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
73        throws Exception {
74      assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
75        waitForCounterBoolean(ctr, oldval, newval, timems));
76    }
77  
78    private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
79        long timems) throws Exception {
80  
81      return waitForCounterBoolean(ctr, oldval, newval, timems, true);
82    }
83  
84    private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval,
85        long timems, boolean failIfTimeout) throws Exception {
86  
87      long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
88        new Waiter.Predicate<Exception>() {
89        @Override
90        public boolean evaluate() throws Exception {
91              return (ctr.get() >= newval);
92        }
93      });
94  
95      if( timeWaited > 0) {
96        // when not timed out
97        assertEquals(newval, ctr.get());
98      }
99      return true;
100   }
101 
102   @Before
103   public void setup() throws Exception {
104     TEST_UTIL.startMiniZKCluster();
105     Configuration conf = TEST_UTIL.getConfiguration();
106     zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
107         "split-log-worker-tests", null);
108     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
109     ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
110     assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
111     LOG.debug(zkw.baseZNode + " created");
112     ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
113     assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
114     LOG.debug(zkw.splitLogZNode + " created");
115     ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
116     assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1);
117     SplitLogCounters.resetCounters();
118     executorService = new ExecutorService("TestSplitLogWorker");
119     executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
120     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
121         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
122   }
123 
124   @After
125   public void teardown() throws Exception {
126     if (executorService != null) {
127       executorService.shutdown();
128     }
129     TEST_UTIL.shutdownMiniZKCluster();
130   }
131 
132   SplitLogWorker.TaskExecutor neverEndingTask =
133     new SplitLogWorker.TaskExecutor() {
134 
135       @Override
136       public Status exec(String name, RecoveryMode mode, CancelableProgressable p) {
137         while (true) {
138           try {
139             Thread.sleep(1000);
140           } catch (InterruptedException e) {
141             return Status.PREEMPTED;
142           }
143           if (!p.progress()) {
144             return Status.PREEMPTED;
145           }
146         }
147       }
148 
149   };
150 
151   @Test(timeout=60000)
152   public void testAcquireTaskAtStartup() throws Exception {
153     LOG.info("testAcquireTaskAtStartup");
154     SplitLogCounters.resetCounters();
155     final String TATAS = "tatas";
156     final ServerName RS = ServerName.valueOf("rs,1,1");
157     RegionServerServices mockedRS = getRegionServer(RS);
158     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
159       new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), 
160         Ids.OPEN_ACL_UNSAFE,
161         CreateMode.PERSISTENT);
162 
163     SplitLogWorker slw =
164         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
165     slw.start();
166     try {
167       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
168       byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
169       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
170       assertTrue(slt.isOwned(RS));
171     } finally {
172       stopSplitLogWorker(slw);
173     }
174   }
175 
176   private void stopSplitLogWorker(final SplitLogWorker slw)
177   throws InterruptedException {
178     if (slw != null) {
179       slw.stop();
180       slw.worker.join(WAIT_TIME);
181       if (slw.worker.isAlive()) {
182         assertTrue(("Could not stop the worker thread slw=" + slw) == null);
183       }
184     }
185   }
186 
187   @Test(timeout=60000)
188   public void testRaceForTask() throws Exception {
189     LOG.info("testRaceForTask");
190     SplitLogCounters.resetCounters();
191     final String TRFT = "trft";
192     final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
193     final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
194     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
195       new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
196         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
197     RegionServerServices mockedRS1 = getRegionServer(SVR1);
198     RegionServerServices mockedRS2 = getRegionServer(SVR2);
199     SplitLogWorker slw1 =
200         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
201     SplitLogWorker slw2 =
202         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
203     slw1.start();
204     slw2.start();
205     try {
206       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
207       // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
208       // not it, that we fell through to the next counter in line and it was set.
209       assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
210           WAIT_TIME, false) ||
211         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
212       byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
213       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
214       assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
215     } finally {
216       stopSplitLogWorker(slw1);
217       stopSplitLogWorker(slw2);
218     }
219   }
220 
221   @Test(timeout=60000)
222   public void testPreemptTask() throws Exception {
223     LOG.info("testPreemptTask");
224     SplitLogCounters.resetCounters();
225     final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
226     final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
227     RegionServerServices mockedRS = getRegionServer(SRV);
228     SplitLogWorker slw =
229         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
230     slw.start();
231     try {
232       Thread.yield(); // let the worker start
233       Thread.sleep(1000);
234       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
235 
236       // this time create a task node after starting the splitLogWorker
237       zkw.getRecoverableZooKeeper().create(PATH,
238         new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
239         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
240 
241       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
242       assertEquals(1, slw.taskReadySeq);
243       byte [] bytes = ZKUtil.getData(zkw, PATH);
244       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
245       assertTrue(slt.isOwned(SRV));
246       slt = new SplitLogTask.Owned(MANAGER, this.mode);
247       ZKUtil.setData(zkw, PATH, slt.toByteArray());
248       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
249     } finally {
250       stopSplitLogWorker(slw);
251     }
252   }
253 
254   @Test(timeout=60000)
255   public void testMultipleTasks() throws Exception {
256     LOG.info("testMultipleTasks");
257     SplitLogCounters.resetCounters();
258     final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
259     final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
260     RegionServerServices mockedRS = getRegionServer(SRV);
261     SplitLogWorker slw =
262         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
263     slw.start();
264     try {
265       Thread.yield(); // let the worker start
266       Thread.sleep(100);
267       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
268 
269       SplitLogTask unassignedManager = 
270         new SplitLogTask.Unassigned(MANAGER, this.mode);
271       zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
272         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
273 
274       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
275       // now the worker is busy doing the above task
276 
277       // create another task
278       final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
279       zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
280         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
281 
282       // preempt the first task, have it owned by another worker
283       final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
284       SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
285       ZKUtil.setData(zkw, PATH1, slt.toByteArray());
286       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
287 
288       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
289       assertEquals(2, slw.taskReadySeq);
290       byte [] bytes = ZKUtil.getData(zkw, PATH2);
291       slt = SplitLogTask.parseFrom(bytes);
292       assertTrue(slt.isOwned(SRV));
293     } finally {
294       stopSplitLogWorker(slw);
295     }
296   }
297 
298   @Test(timeout=60000)
299   public void testRescan() throws Exception {
300     LOG.info("testRescan");
301     SplitLogCounters.resetCounters();
302     final ServerName SRV = ServerName.valueOf("svr,1,1");
303     RegionServerServices mockedRS = getRegionServer(SRV);
304     slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
305     slw.start();
306     Thread.yield(); // let the worker start
307     Thread.sleep(100);
308 
309     String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
310     SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
311     zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
312       CreateMode.PERSISTENT);
313 
314     waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
315     // now the worker is busy doing the above task
316 
317     // preempt the task, have it owned by another worker
318     ZKUtil.setData(zkw, task, slt.toByteArray());
319     waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
320 
321     // create a RESCAN node
322     String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
323     rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
324       CreateMode.PERSISTENT_SEQUENTIAL);
325 
326     waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
327     // RESCAN node might not have been processed if the worker became busy
328     // with the above task. preempt the task again so that now the RESCAN
329     // node is processed
330     ZKUtil.setData(zkw, task, slt.toByteArray());
331     waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
332     waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
333 
334     List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
335     LOG.debug(nodes);
336     int num = 0;
337     for (String node : nodes) {
338       num++;
339       if (node.startsWith("RESCAN")) {
340         String name = ZKSplitLog.getEncodedNodeName(zkw, node);
341         String fn = ZKSplitLog.getFileName(name);
342         byte [] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(zkw.splitLogZNode, fn));
343         slt = SplitLogTask.parseFrom(data);
344         assertTrue(slt.toString(), slt.isDone(SRV));
345       }
346     }
347     assertEquals(2, num);
348   }
349 
350   @Test(timeout=60000)
351   public void testAcquireMultiTasks() throws Exception {
352     LOG.info("testAcquireMultiTasks");
353     SplitLogCounters.resetCounters();
354     final String TATAS = "tatas";
355     final ServerName RS = ServerName.valueOf("rs,1,1");
356     final int maxTasks = 3;
357     Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
358     testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
359     RegionServerServices mockedRS = getRegionServer(RS);
360 
361     for (int i = 0; i < maxTasks; i++) {
362       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
363         new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
364           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
365     }
366 
367     SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
368     slw.start();
369     try {
370       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
371       for (int i = 0; i < maxTasks; i++) {
372         byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
373         SplitLogTask slt = SplitLogTask.parseFrom(bytes);
374         assertTrue(slt.isOwned(RS));
375       }
376     } finally {
377       stopSplitLogWorker(slw);
378     }
379   }
380 
381   /**
382    * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per
383    * RS
384    * @throws Exception
385    */
386   @Test(timeout=60000)
387   public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
388     LOG.info("testAcquireMultiTasks");
389     SplitLogCounters.resetCounters();
390     final String TATAS = "tatas";
391     final ServerName RS = ServerName.valueOf("rs,1,1");
392     final ServerName RS2 = ServerName.valueOf("rs,1,2");
393     final int maxTasks = 3;
394     Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
395     testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
396     RegionServerServices mockedRS = getRegionServer(RS);
397 
398     // create two RS nodes
399     String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
400     zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
401     rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
402     zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
403 
404     for (int i = 0; i < maxTasks; i++) {
405       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
406         new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
407           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
408     }
409 
410     SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
411     slw.start();
412     try {
413       int acquiredTasks = 0;
414       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
415       for (int i = 0; i < maxTasks; i++) {
416         byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
417         SplitLogTask slt = SplitLogTask.parseFrom(bytes);
418         if (slt.isOwned(RS)) {
419           acquiredTasks++;
420         }
421       }
422       assertEquals(2, acquiredTasks);
423     } finally {
424       stopSplitLogWorker(slw);
425     }
426   }
427 
428   /**
429    * Create a mocked region server service instance
430    * @param server
431    * @return
432    */
433   private RegionServerServices getRegionServer(ServerName name) {
434 
435     RegionServerServices mockedServer = mock(RegionServerServices.class);
436     when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
437     when(mockedServer.getServerName()).thenReturn(name);
438     when(mockedServer.getZooKeeper()).thenReturn(zkw);
439     when(mockedServer.isStopped()).thenReturn(false);
440     when(mockedServer.getExecutorService()).thenReturn(executorService);
441 
442     return mockedServer;
443   }
444 
445 }