1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.testclassification.MediumTests;
32 import org.apache.hadoop.hbase.MiniHBaseCluster;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.client.HBaseAdmin;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.regionserver.HRegion;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.JVMClusterUtil;
41
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45
46 import org.junit.experimental.categories.Category;
47
48 @Category(MediumTests.class)
49 public class TestAssignmentListener {
50 private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class);
51
52 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
53
54 static class DummyListener {
55 protected AtomicInteger modified = new AtomicInteger(0);
56
57 public void awaitModifications(int count) throws InterruptedException {
58 while (!modified.compareAndSet(count, 0)) {
59 Thread.sleep(100);
60 }
61 }
62 }
63
64 static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
65 private AtomicInteger closeCount = new AtomicInteger(0);
66 private AtomicInteger openCount = new AtomicInteger(0);
67
68 public DummyAssignmentListener() {
69 }
70
71 public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) {
72 LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
73 openCount.incrementAndGet();
74 modified.incrementAndGet();
75 }
76
77 public void regionClosed(final HRegionInfo regionInfo) {
78 LOG.info("Assignment close region=" + regionInfo);
79 closeCount.incrementAndGet();
80 modified.incrementAndGet();
81 }
82
83 public void reset() {
84 openCount.set(0);
85 closeCount.set(0);
86 }
87
88 public int getLoadCount() {
89 return openCount.get();
90 }
91
92 public int getCloseCount() {
93 return closeCount.get();
94 }
95 }
96
97 static class DummyServerListener extends DummyListener implements ServerListener {
98 private AtomicInteger removedCount = new AtomicInteger(0);
99 private AtomicInteger addedCount = new AtomicInteger(0);
100
101 public DummyServerListener() {
102 }
103
104 public void serverAdded(final ServerName serverName) {
105 LOG.info("Server added " + serverName);
106 addedCount.incrementAndGet();
107 modified.incrementAndGet();
108 }
109
110 public void serverRemoved(final ServerName serverName) {
111 LOG.info("Server removed " + serverName);
112 removedCount.incrementAndGet();
113 modified.incrementAndGet();
114 }
115
116 public void reset() {
117 addedCount.set(0);
118 removedCount.set(0);
119 }
120
121 public int getAddedCount() {
122 return addedCount.get();
123 }
124
125 public int getRemovedCount() {
126 return removedCount.get();
127 }
128 }
129
130 @BeforeClass
131 public static void beforeAllTests() throws Exception {
132 TEST_UTIL.startMiniCluster(2);
133 }
134
135 @AfterClass
136 public static void afterAllTests() throws Exception {
137 TEST_UTIL.shutdownMiniCluster();
138 }
139
140 @Test(timeout=60000)
141 public void testServerListener() throws IOException, InterruptedException {
142 ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
143
144 DummyServerListener listener = new DummyServerListener();
145 serverManager.registerListener(listener);
146 try {
147 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
148
149
150 miniCluster.startRegionServer();
151 listener.awaitModifications(1);
152 assertEquals(1, listener.getAddedCount());
153 assertEquals(0, listener.getRemovedCount());
154
155
156 listener.reset();
157 miniCluster.startRegionServer();
158 listener.awaitModifications(1);
159 assertEquals(1, listener.getAddedCount());
160 assertEquals(0, listener.getRemovedCount());
161
162 int nrs = miniCluster.getRegionServerThreads().size();
163
164
165 listener.reset();
166 miniCluster.stopRegionServer(nrs - 1);
167 listener.awaitModifications(1);
168 assertEquals(0, listener.getAddedCount());
169 assertEquals(1, listener.getRemovedCount());
170
171
172 listener.reset();
173 miniCluster.stopRegionServer(nrs - 2);
174 listener.awaitModifications(1);
175 assertEquals(0, listener.getAddedCount());
176 assertEquals(1, listener.getRemovedCount());
177 } finally {
178 serverManager.unregisterListener(listener);
179 }
180 }
181
182 @Test(timeout=60000)
183 public void testAssignmentListener() throws IOException, InterruptedException {
184 AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
185 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
186
187 DummyAssignmentListener listener = new DummyAssignmentListener();
188 am.registerListener(listener);
189 try {
190 final String TABLE_NAME_STR = "testtb";
191 final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
192 final byte[] FAMILY = Bytes.toBytes("cf");
193
194
195 LOG.info("Create Table");
196 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
197 listener.awaitModifications(1);
198 assertEquals(1, listener.getLoadCount());
199 assertEquals(0, listener.getCloseCount());
200
201
202 HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
203 try {
204 for (int i = 0; i < 10; ++i) {
205 byte[] key = Bytes.toBytes("row-" + i);
206 Put put = new Put(key);
207 put.add(FAMILY, null, key);
208 table.put(put);
209 }
210 } finally {
211 table.close();
212 }
213
214
215 LOG.info("Split Table");
216 listener.reset();
217 admin.split(TABLE_NAME_STR, "row-3");
218 listener.awaitModifications(3);
219 assertEquals(2, listener.getLoadCount());
220 assertEquals(1, listener.getCloseCount());
221
222
223 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
224 int mergeable = 0;
225 while (mergeable < 2) {
226 Thread.sleep(100);
227 admin.majorCompact(TABLE_NAME_STR);
228 mergeable = 0;
229 for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
230 for (HRegion region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) {
231 mergeable += region.isMergeable() ? 1 : 0;
232 }
233 }
234 }
235
236
237 LOG.info("Merge Regions");
238 listener.reset();
239 List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
240 assertEquals(2, regions.size());
241 admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
242 regions.get(1).getEncodedNameAsBytes(), true);
243 listener.awaitModifications(3);
244 assertEquals(1, admin.getTableRegions(TABLE_NAME).size());
245 assertEquals(1, listener.getLoadCount());
246 assertEquals(2, listener.getCloseCount());
247
248
249 LOG.info("Drop Table");
250 listener.reset();
251 TEST_UTIL.deleteTable(TABLE_NAME);
252 listener.awaitModifications(1);
253 assertEquals(0, listener.getLoadCount());
254 assertEquals(1, listener.getCloseCount());
255 } finally {
256 am.unregisterListener(listener);
257 }
258 }
259 }