1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.concurrent.LinkedBlockingQueue;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
31 import org.apache.hadoop.hbase.testclassification.SmallTests;
32 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
33 import org.junit.experimental.categories.Category;
34 import org.junit.runner.RunWith;
35 import org.junit.runners.Parameterized;
36 import org.junit.runners.Parameterized.Parameters;
37 import org.junit.Test;
38
39
40
41
42
43 @Category(SmallTests.class)
44 @RunWith(Parameterized.class)
45 public class TestCallQueue {
46
47 public static final Log LOG = LogFactory.getLog(TestCallQueue.class);
48 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
49
50 private static final MetricsAssertHelper metricsHelper =
51 CompatibilitySingletonFactory.getInstance(MetricsAssertHelper.class);
52
53 private int elementsAdded;
54 private int elementsRemoved;
55
56 @Parameters
57 public static Collection<Object[]> getParameters() {
58 Collection<Object[]> parameters = new ArrayList<Object[]>();
59 for (int elementsAdded : new int[] {100, 200, 300}) {
60 for (int elementsRemoved : new int[] {0, 20, 100}) {
61 parameters.add(new Object[]{new Integer(elementsAdded),
62 new Integer(elementsRemoved)});
63 }
64 }
65 return parameters;
66 }
67
68 public TestCallQueue(int elementsAdded, int elementsRemoved) {
69 this.elementsAdded = elementsAdded;
70 this.elementsRemoved = elementsRemoved;
71 LOG.debug("elementsAdded:" + elementsAdded +
72 " elementsRemoved:" + elementsRemoved);
73
74 }
75
76 @Test(timeout = 60000)
77 public void testPutTake() throws Exception {
78 ThriftMetrics metrics = createMetrics();
79 CallQueue callQueue = new CallQueue(
80 new LinkedBlockingQueue<Call>(), metrics);
81 for (int i = 0; i < elementsAdded; ++i) {
82 callQueue.put(createDummyRunnable());
83 }
84 for (int i = 0; i < elementsRemoved; ++i) {
85 callQueue.take();
86 }
87 verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved);
88 }
89
90 @Test(timeout = 60000)
91 public void testOfferPoll() throws Exception {
92 ThriftMetrics metrics = createMetrics();
93 CallQueue callQueue = new CallQueue(
94 new LinkedBlockingQueue<Call>(), metrics);
95 for (int i = 0; i < elementsAdded; ++i) {
96 callQueue.offer(createDummyRunnable());
97 }
98 for (int i = 0; i < elementsRemoved; ++i) {
99 callQueue.poll();
100 }
101 verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved);
102 }
103
104 private static ThriftMetrics createMetrics() throws Exception {
105 Configuration conf = UTIL.getConfiguration();
106 ThriftMetrics m = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
107 m.getSource().init();
108 return m;
109 }
110
111
112 private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue)
113 throws Exception {
114 metricsHelper.assertCounter(name, expectValue, metrics.getSource());
115 }
116
117 private static Runnable createDummyRunnable() {
118 return new Runnable() {
119 @Override
120 public void run() {
121 }
122 };
123 }
124
125 }
126