1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.CellScannable;
29 import org.apache.hadoop.hbase.CellScanner;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.testclassification.MediumTests;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
35 import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
36 import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
37 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
38 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.junit.AfterClass;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44
45 import com.google.common.collect.Lists;
46
47 @Category(MediumTests.class)
48 public class TestRpcControllerFactory {
49
50 public static class StaticRpcControllerFactory extends RpcControllerFactory {
51
52 public StaticRpcControllerFactory(Configuration conf) {
53 super(conf);
54 }
55
56 public PayloadCarryingRpcController newController() {
57 return new CountingRpcController(super.newController());
58 }
59
60 public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
61 return new CountingRpcController(super.newController(cellScanner));
62 }
63
64 public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
65 return new CountingRpcController(super.newController(cellIterables));
66 }
67 }
68
69 public static class CountingRpcController extends DelegatingPayloadCarryingRpcController {
70
71 private static AtomicInteger INT_PRIORITY = new AtomicInteger();
72 private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
73
74 public CountingRpcController(PayloadCarryingRpcController delegate) {
75 super(delegate);
76 }
77
78 @Override
79 public void setPriority(int priority) {
80 super.setPriority(priority);
81 INT_PRIORITY.incrementAndGet();
82 }
83
84 @Override
85 public void setPriority(TableName tn) {
86 super.setPriority(tn);
87
88
89 if (!tn.isSystemTable()) {
90 TABLE_PRIORITY.incrementAndGet();
91 }
92
93 }
94 }
95
96 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
97
98 @BeforeClass
99 public static void setup() throws Exception {
100
101
102 Configuration conf = UTIL.getConfiguration();
103 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
104 ProtobufCoprocessorService.class.getName());
105
106 UTIL.startMiniCluster();
107 }
108
109 @AfterClass
110 public static void teardown() throws Exception {
111 UTIL.shutdownMiniCluster();
112 }
113
114
115
116
117
118
119
120 @Test
121 public void testCountController() throws Exception {
122 Configuration conf = new Configuration(UTIL.getConfiguration());
123
124 conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
125 StaticRpcControllerFactory.class.getName());
126
127 TableName name = TableName.valueOf("testcustomcontroller");
128 UTIL.createTable(name, fam1).close();
129
130
131 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
132
133 HTable table = new HTable(conf, name);
134 table.setAutoFlushTo(false);
135 byte[] row = Bytes.toBytes("row");
136 Put p = new Put(row);
137 p.add(fam1, fam1, Bytes.toBytes("val0"));
138 table.put(p);
139 table.flushCommits();
140 Integer counter = 1;
141 counter = verifyCount(counter);
142
143 Delete d = new Delete(row);
144 d.deleteColumn(fam1, fam1);
145 table.delete(d);
146 counter = verifyCount(counter);
147
148 Put p2 = new Put(row);
149 p2.add(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
150 table.batch(Lists.newArrayList(p, p2), new Object[2]);
151
152 counter = verifyCount(counter);
153
154 Append append = new Append(row);
155 append.add(fam1, fam1, Bytes.toBytes("val2"));
156 table.append(append);
157 counter = verifyCount(counter);
158
159
160 Get g = new Get(row);
161 table.get(g);
162 counter = verifyCount(counter);
163
164 ResultScanner scan = table.getScanner(fam1);
165 scan.next();
166 scan.close();
167 counter = verifyCount(counter);
168
169 Get g2 = new Get(row);
170 table.get(Lists.newArrayList(g, g2));
171
172 counter = verifyCount(counter);
173
174
175 Scan scanInfo = new Scan(row);
176
177 scanInfo.setSmall(true);
178 counter = doScan(table, scanInfo, counter);
179
180
181 scanInfo.setReversed(true);
182 counter = doScan(table, scanInfo, counter);
183
184
185 scanInfo.setSmall(false);
186 counter = doScan(table, scanInfo, counter);
187
188 table.close();
189 }
190
191 int doScan(HTable table, Scan scan, int expectedCount) throws IOException {
192 ResultScanner results = table.getScanner(scan);
193 results.next();
194 results.close();
195 return verifyCount(expectedCount);
196 }
197
198 int verifyCount(Integer counter) {
199 assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
200 assertEquals(0, CountingRpcController.INT_PRIORITY.get());
201 return counter + 1;
202 }
203 }