1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import static org.mockito.Matchers.any;
21 import static org.mockito.Matchers.anyString;
22 import static org.mockito.Matchers.eq;
23 import static org.mockito.Mockito.doAnswer;
24 import static org.mockito.Mockito.doThrow;
25 import static org.mockito.Mockito.inOrder;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.never;
28 import static org.mockito.Mockito.reset;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.verifyZeroInteractions;
31 import static org.mockito.Mockito.when;
32
33 import java.io.IOException;
34 import java.util.concurrent.ThreadPoolExecutor;
35
36 import org.apache.hadoop.hbase.errorhandling.ForeignException;
37 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
38 import org.apache.hadoop.hbase.errorhandling.TimeoutException;
39 import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
40 import org.apache.hadoop.hbase.testclassification.SmallTests;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.mockito.InOrder;
45 import org.mockito.Mockito;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.mockito.stubbing.Answer;
48
49
50
51
52 @Category(SmallTests.class)
53 public class TestProcedureMember {
54 private static final long WAKE_FREQUENCY = 100;
55 private static final long TIMEOUT = 100000;
56 private static final long POOL_KEEP_ALIVE = 1;
57
58 private final String op = "some op";
59 private final byte[] data = new byte[0];
60 private final ForeignExceptionDispatcher mockListener = Mockito
61 .spy(new ForeignExceptionDispatcher());
62 private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
63 private final ProcedureMemberRpcs mockMemberComms = Mockito
64 .mock(ProcedureMemberRpcs.class);
65 private ProcedureMember member;
66 private ForeignExceptionDispatcher dispatcher;
67 Subprocedure spySub;
68
69
70
71
72 @After
73 public void resetTest() {
74 reset(mockListener, mockBuilder, mockMemberComms);
75 if (member != null)
76 try {
77 member.close();
78 } catch (IOException e) {
79 e.printStackTrace();
80 }
81 }
82
83
84
85
86
87 private ProcedureMember buildCohortMember() {
88 String name = "node";
89 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
90 return new ProcedureMember(mockMemberComms, pool, mockBuilder);
91 }
92
93
94
95
96 private void buildCohortMemberPair() throws IOException {
97 dispatcher = new ForeignExceptionDispatcher();
98 String name = "node";
99 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
100 member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
101 when(mockMemberComms.getMemberName()).thenReturn("membername");
102 Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
103 spySub = spy(subproc);
104 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
105 addCommitAnswer();
106 }
107
108
109
110
111
112 private void addCommitAnswer() throws IOException {
113 doAnswer(new Answer<Void>() {
114 @Override
115 public Void answer(InvocationOnMock invocation) throws Throwable {
116 member.receivedReachedGlobalBarrier(op);
117 return null;
118 }
119 }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
120 }
121
122
123
124
125 @Test(timeout = 500)
126 public void testSimpleRun() throws Exception {
127 member = buildCohortMember();
128 EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
129 EmptySubprocedure spy = spy(subproc);
130 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
131
132
133 addCommitAnswer();
134
135
136
137 Subprocedure subproc1 = member.createSubprocedure(op, data);
138 member.submitSubprocedure(subproc1);
139
140 subproc.waitForLocallyCompleted();
141
142
143 InOrder order = inOrder(mockMemberComms, spy);
144 order.verify(spy).acquireBarrier();
145 order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
146 order.verify(spy).insideBarrier();
147 order.verify(mockMemberComms).sendMemberCompleted(eq(spy));
148 order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
149 any(ForeignException.class));
150 }
151
152
153
154
155
156 @Test(timeout = 60000)
157 public void testMemberPrepareException() throws Exception {
158 buildCohortMemberPair();
159
160
161 doAnswer(
162 new Answer<Void>() {
163 @Override
164 public Void answer(InvocationOnMock invocation) throws Throwable {
165 throw new IOException("Forced IOException in member acquireBarrier");
166 }
167 }).when(spySub).acquireBarrier();
168
169
170
171 Subprocedure subproc = member.createSubprocedure(op, data);
172 member.submitSubprocedure(subproc);
173
174 member.closeAndWait(TIMEOUT);
175
176
177 InOrder order = inOrder(mockMemberComms, spySub);
178 order.verify(spySub).acquireBarrier();
179
180 order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
181 order.verify(spySub, never()).insideBarrier();
182 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
183
184 order.verify(spySub).cancel(anyString(), any(Exception.class));
185 order.verify(spySub).cleanup(any(Exception.class));
186 }
187
188
189
190
191 @Test(timeout = 60000)
192 public void testSendMemberAcquiredCommsFailure() throws Exception {
193 buildCohortMemberPair();
194
195
196 doAnswer(
197 new Answer<Void>() {
198 @Override
199 public Void answer(InvocationOnMock invocation) throws Throwable {
200 throw new IOException("Forced IOException in memeber prepare");
201 }
202 }).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
203
204
205
206 Subprocedure subproc = member.createSubprocedure(op, data);
207 member.submitSubprocedure(subproc);
208
209 member.closeAndWait(TIMEOUT);
210
211
212 InOrder order = inOrder(mockMemberComms, spySub);
213 order.verify(spySub).acquireBarrier();
214 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
215
216
217 order.verify(spySub, never()).insideBarrier();
218 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
219
220 order.verify(spySub).cancel(anyString(), any(Exception.class));
221 order.verify(spySub).cleanup(any(Exception.class));
222 }
223
224
225
226
227
228
229
230 @Test(timeout = 60000)
231 public void testCoordinatorAbort() throws Exception {
232 buildCohortMemberPair();
233
234
235 final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
236 doAnswer(
237 new Answer<Void>() {
238 @Override
239 public Void answer(InvocationOnMock invocation) throws Throwable {
240
241 spySub.cancel("bogus message", oate);
242
243 Thread.sleep(WAKE_FREQUENCY);
244 return null;
245 }
246 }).when(spySub).waitForReachedGlobalBarrier();
247
248
249
250 Subprocedure subproc = member.createSubprocedure(op, data);
251 member.submitSubprocedure(subproc);
252
253 member.closeAndWait(TIMEOUT);
254
255
256 InOrder order = inOrder(mockMemberComms, spySub);
257 order.verify(spySub).acquireBarrier();
258 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
259
260 order.verify(spySub, never()).insideBarrier();
261 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
262
263 order.verify(spySub).cancel(anyString(), any(Exception.class));
264 order.verify(spySub).cleanup(any(Exception.class));
265 }
266
267
268
269
270
271
272
273
274
275 @Test(timeout = 60000)
276 public void testMemberCommitException() throws Exception {
277 buildCohortMemberPair();
278
279
280 doAnswer(
281 new Answer<Void>() {
282 @Override
283 public Void answer(InvocationOnMock invocation) throws Throwable {
284 throw new IOException("Forced IOException in memeber prepare");
285 }
286 }).when(spySub).insideBarrier();
287
288
289
290 Subprocedure subproc = member.createSubprocedure(op, data);
291 member.submitSubprocedure(subproc);
292
293 member.closeAndWait(TIMEOUT);
294
295
296 InOrder order = inOrder(mockMemberComms, spySub);
297 order.verify(spySub).acquireBarrier();
298 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
299 order.verify(spySub).insideBarrier();
300
301
302 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
303
304 order.verify(spySub).cancel(anyString(), any(Exception.class));
305 order.verify(spySub).cleanup(any(Exception.class));
306 }
307
308
309
310
311
312
313
314
315
316 @Test(timeout = 60000)
317 public void testMemberCommitCommsFailure() throws Exception {
318 buildCohortMemberPair();
319 final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
320 doAnswer(
321 new Answer<Void>() {
322 @Override
323 public Void answer(InvocationOnMock invocation) throws Throwable {
324
325 spySub.cancel("commit comms fail", oate);
326
327 Thread.sleep(WAKE_FREQUENCY);
328 return null;
329 }
330 }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class));
331
332
333
334 Subprocedure subproc = member.createSubprocedure(op, data);
335 member.submitSubprocedure(subproc);
336
337 member.closeAndWait(TIMEOUT);
338
339
340 InOrder order = inOrder(mockMemberComms, spySub);
341 order.verify(spySub).acquireBarrier();
342 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
343 order.verify(spySub).insideBarrier();
344 order.verify(mockMemberComms).sendMemberCompleted(eq(spySub));
345
346 order.verify(spySub).cancel(anyString(), any(Exception.class));
347 order.verify(spySub).cleanup(any(Exception.class));
348 }
349
350
351
352
353
354 @Test(timeout = 60000)
355 public void testPropagateConnectionErrorBackToManager() throws Exception {
356
357 member = buildCohortMember();
358 ProcedureMember memberSpy = spy(member);
359
360
361 final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
362 ForeignExceptionDispatcher dispSpy = spy(dispatcher);
363 Subprocedure commit = new EmptySubprocedure(member, dispatcher);
364 Subprocedure spy = spy(commit);
365 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
366
367
368 doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
369
370 doThrow(new IOException("Controller is down!")).when(mockMemberComms)
371 .sendMemberAborted(eq(spy), any(ForeignException.class));
372
373
374
375
376 Subprocedure subproc = memberSpy.createSubprocedure(op, data);
377 memberSpy.submitSubprocedure(subproc);
378
379 memberSpy.closeAndWait(TIMEOUT);
380
381
382 InOrder order = inOrder(mockMemberComms, spy, dispSpy);
383
384 order.verify(spy).acquireBarrier();
385 order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
386
387
388
389
390
391
392
393 }
394
395
396
397
398
399
400 @Test
401 public void testNoTaskToBeRunFromRequest() throws Exception {
402 ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
403 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
404 .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
405 member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
406
407
408 Subprocedure subproc = member.createSubprocedure(op, data);
409 member.submitSubprocedure(subproc);
410
411 try {
412
413 Subprocedure subproc2 = member.createSubprocedure(op, data);
414 member.submitSubprocedure(subproc2);
415 } catch (IllegalStateException ise) {
416 }
417
418 try {
419
420 Subprocedure subproc3 = member.createSubprocedure(op, data);
421 member.submitSubprocedure(subproc3);
422 } catch (IllegalArgumentException iae) {
423 }
424
425
426 verifyZeroInteractions(pool);
427
428
429
430 }
431
432
433
434
435 public class EmptySubprocedure extends SubprocedureImpl {
436 public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
437 super( member, op, dispatcher,
438
439 WAKE_FREQUENCY, TIMEOUT);
440 }
441 }
442 }