1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.mockito.Matchers.any;
22 import static org.mockito.Matchers.anyListOf;
23 import static org.mockito.Matchers.eq;
24 import static org.mockito.Mockito.atMost;
25 import static org.mockito.Mockito.never;
26 import static org.mockito.Mockito.spy;
27 import static org.mockito.Mockito.when;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.List;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.hbase.Abortable;
40 import org.apache.hadoop.hbase.HBaseTestingUtility;
41 import org.apache.hadoop.hbase.testclassification.MediumTests;
42 import org.apache.hadoop.hbase.errorhandling.ForeignException;
43 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
44 import org.apache.hadoop.hbase.errorhandling.TimeoutException;
45 import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
46 import org.apache.hadoop.hbase.util.Pair;
47 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48 import org.junit.AfterClass;
49 import org.junit.BeforeClass;
50 import org.junit.Test;
51 import org.junit.experimental.categories.Category;
52 import org.mockito.Mockito;
53 import org.mockito.internal.matchers.ArrayEquals;
54 import org.mockito.invocation.InvocationOnMock;
55 import org.mockito.stubbing.Answer;
56 import org.mockito.verification.VerificationMode;
57
58 import com.google.common.collect.Lists;
59
60
61
62
63 @Category(MediumTests.class)
64 public class TestZKProcedure {
65
66 private static final Log LOG = LogFactory.getLog(TestZKProcedure.class);
67 private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
68 private static final String COORDINATOR_NODE_NAME = "coordinator";
69 private static final long KEEP_ALIVE = 100;
70 private static final int POOL_SIZE = 1;
71 private static final long TIMEOUT = 10000;
72 private static final long WAKE_FREQUENCY = 500;
73 private static final String opName = "op";
74 private static final byte[] data = new byte[] { 1, 2 };
75 private static final VerificationMode once = Mockito.times(1);
76
77 @BeforeClass
78 public static void setupTest() throws Exception {
79 UTIL.startMiniZKCluster();
80 }
81
82 @AfterClass
83 public static void cleanupTest() throws Exception {
84 UTIL.shutdownMiniZKCluster();
85 }
86
87 private static ZooKeeperWatcher newZooKeeperWatcher() throws IOException {
88 return new ZooKeeperWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
89 @Override
90 public void abort(String why, Throwable e) {
91 throw new RuntimeException(
92 "Unexpected abort in distributed three phase commit test:" + why, e);
93 }
94
95 @Override
96 public boolean isAborted() {
97 return false;
98 }
99 });
100 }
101
102 @Test
103 public void testEmptyMemberSet() throws Exception {
104 runCommit();
105 }
106
107 @Test
108 public void testSingleMember() throws Exception {
109 runCommit("one");
110 }
111
112 @Test
113 public void testMultipleMembers() throws Exception {
114 runCommit("one", "two", "three", "four" );
115 }
116
117 private void runCommit(String... members) throws Exception {
118
119 if (members == null) {
120 members = new String[0];
121 }
122 List<String> expected = Arrays.asList(members);
123
124
125 ZooKeeperWatcher coordZkw = newZooKeeperWatcher();
126 String opDescription = "coordination test - " + members.length + " cohort members";
127
128
129 ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
130 coordZkw, opDescription, COORDINATOR_NODE_NAME);
131 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
132 ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
133 @Override
134 public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
135 List<String> expectedMembers) {
136 return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
137 }
138 };
139
140
141
142 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
143 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
144 members.length);
145
146 for (String member : members) {
147 ZooKeeperWatcher watcher = newZooKeeperWatcher();
148 ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
149 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
150 ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
151 procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
152 comms.start(member, procMember);
153 }
154
155
156 final List<Subprocedure> subprocs = new ArrayList<Subprocedure>();
157 for (int i = 0; i < procMembers.size(); i++) {
158 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
159 Subprocedure commit = Mockito
160 .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
161 WAKE_FREQUENCY, TIMEOUT));
162 subprocs.add(commit);
163 }
164
165
166 final AtomicInteger i = new AtomicInteger(0);
167 Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
168 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
169 new Answer<Subprocedure>() {
170 @Override
171 public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
172 int index = i.getAndIncrement();
173 LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
174 Subprocedure commit = subprocs.get(index);
175 return commit;
176 }
177 });
178
179
180
181
182
183
184 Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
185
186
187
188
189 waitAndVerifyProc(task, once, once, never(), once, false);
190 verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
191
192
193 closeAll(coordinator, coordinatorComms, procMembers);
194 }
195
196
197
198
199
200 @Test
201 public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
202 String opDescription = "error injection coordination";
203 String[] cohortMembers = new String[] { "one", "two", "three" };
204 List<String> expected = Lists.newArrayList(cohortMembers);
205
206 final int memberErrorIndex = 2;
207 final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
208
209
210 ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
211 ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
212 coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
213 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
214 ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
215
216
217 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
218 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
219 expected.size());
220 for (String member : expected) {
221 ZooKeeperWatcher watcher = newZooKeeperWatcher();
222 ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
223 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
224 ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
225 members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
226 controller.start(member, mem);
227 }
228
229
230 final List<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
231 final int[] elem = new int[1];
232 for (int i = 0; i < members.size(); i++) {
233 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
234 final ProcedureMember comms = members.get(i).getFirst();
235 Subprocedure commit = Mockito
236 .spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
237
238 Mockito.doAnswer(new Answer<Void>() {
239 @Override
240 public Void answer(InvocationOnMock invocation) throws Throwable {
241 int index = elem[0];
242 if (index == memberErrorIndex) {
243 LOG.debug("Sending error to coordinator");
244 ForeignException remoteCause = new ForeignException("TIMER",
245 new TimeoutException("subprocTimeout" , 1, 2, 0));
246 Subprocedure r = ((Subprocedure) invocation.getMock());
247 LOG.error("Remote commit failure, not propagating error:" + remoteCause);
248 comms.receiveAbortProcedure(r.getName(), remoteCause);
249 assertEquals(r.isComplete(), true);
250
251
252 try {
253 Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
254 WAKE_FREQUENCY, "coordinator received error");
255 } catch (InterruptedException e) {
256 LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
257
258 Thread.currentThread().interrupt();
259 }
260 }
261 elem[0] = ++index;
262 return null;
263 }
264 }).when(commit).acquireBarrier();
265 cohortTasks.add(commit);
266 }
267
268
269 final AtomicInteger taskIndex = new AtomicInteger();
270 Mockito.when(
271 subprocFactory.buildSubprocedure(Mockito.eq(opName),
272 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
273 new Answer<Subprocedure>() {
274 @Override
275 public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
276 int index = taskIndex.getAndIncrement();
277 Subprocedure commit = cohortTasks.get(index);
278 return commit;
279 }
280 });
281
282
283 ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
284 .spy(new ForeignExceptionDispatcher());
285 Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
286 coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
287 opName, data, expected));
288 when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(opName), eq(data), anyListOf(String.class)))
289 .thenReturn(coordinatorTask);
290
291 Mockito.doAnswer(new Answer<Void>() {
292 @Override
293 public Void answer(InvocationOnMock invocation) throws Throwable {
294
295 invocation.callRealMethod();
296
297 coordinatorReceivedErrorLatch.countDown();
298 return null;
299 }
300 }).when(coordinatorTask).receive(Mockito.any(ForeignException.class));
301
302
303
304
305
306 Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
307 assertEquals("Didn't mock coordinator task", coordinatorTask, task);
308
309
310 try {
311 task.waitForCompleted();
312 } catch (ForeignException fe) {
313
314 }
315
316
317
318
319
320
321
322 waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true);
323 verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
324 once, true);
325
326
327 closeAll(coordinator, coordinatorController, members);
328 }
329
330
331
332
333
334
335 private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
336 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
337 throws Exception {
338 boolean caughtError = false;
339 try {
340 proc.waitForCompleted();
341 } catch (ForeignException fe) {
342 caughtError = true;
343 }
344
345 Mockito.verify(proc, prepare).sendGlobalBarrierStart();
346 Mockito.verify(proc, commit).sendGlobalBarrierReached();
347 Mockito.verify(proc, finish).sendGlobalBarrierComplete();
348 assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
349 .hasException());
350 assertEquals("Operation error state was unexpected", opHasError, caughtError);
351
352 }
353
354
355
356
357
358
359 private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
360 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
361 throws Exception {
362 boolean caughtError = false;
363 try {
364 op.waitForLocallyCompleted();
365 } catch (ForeignException fe) {
366 caughtError = true;
367 }
368
369 Mockito.verify(op, prepare).acquireBarrier();
370 Mockito.verify(op, commit).insideBarrier();
371
372
373 assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
374 .hasException());
375 assertEquals("Operation error state was unexpected", opHasError, caughtError);
376
377 }
378
379 private void verifyCohortSuccessful(List<String> cohortNames,
380 SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
381 VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
382 VerificationMode finish, boolean opHasError) throws Exception {
383
384
385 Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
386 Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
387
388 int j = 0;
389 for (Subprocedure op : cohortTasks) {
390 LOG.debug("Checking mock:" + (j++));
391 waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
392 }
393 }
394
395 private void closeAll(
396 ProcedureCoordinator coordinator,
397 ZKProcedureCoordinatorRpcs coordinatorController,
398 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
399 throws IOException {
400
401 for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
402 member.getFirst().close();
403 member.getSecond().close();
404 }
405 coordinator.close();
406 coordinatorController.close();
407 }
408 }