1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.client.HBaseAdmin;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
38 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
39 import org.apache.hadoop.hbase.regionserver.HRegion;
40 import org.apache.hadoop.hbase.regionserver.HRegionServer;
41 import org.apache.hadoop.hbase.regionserver.HStore;
42 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
43 import org.apache.hadoop.hbase.regionserver.Store;
44 import org.apache.hadoop.hbase.regionserver.StoreFile;
45 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
46 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
47 import org.apache.hadoop.hbase.regionserver.wal.HLog;
48 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
49 import org.apache.hadoop.hbase.testclassification.MediumTests;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
52 import org.junit.Ignore;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55
56 import com.google.common.collect.Lists;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 @Category(MediumTests.class)
77 public class TestIOFencing {
78 static final Log LOG = LogFactory.getLog(TestIOFencing.class);
79 static {
80
81
82
83
84
85
86
87
88
89 }
90
91 public abstract static class CompactionBlockerRegion extends HRegion {
92 volatile int compactCount = 0;
93 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
94 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
95
96 @SuppressWarnings("deprecation")
97 public CompactionBlockerRegion(Path tableDir, HLog log,
98 FileSystem fs, Configuration confParam, HRegionInfo info,
99 HTableDescriptor htd, RegionServerServices rsServices) {
100 super(tableDir, log, fs, confParam, info, htd, rsServices);
101 }
102
103 public void stopCompactions() {
104 compactionsBlocked = new CountDownLatch(1);
105 compactionsWaiting = new CountDownLatch(1);
106 }
107
108 public void allowCompactions() {
109 LOG.debug("allowing compactions");
110 compactionsBlocked.countDown();
111 }
112 public void waitForCompactionToBlock() throws IOException {
113 try {
114 LOG.debug("waiting for compaction to block");
115 compactionsWaiting.await();
116 LOG.debug("compaction block reached");
117 } catch (InterruptedException ex) {
118 throw new IOException(ex);
119 }
120 }
121
122 @Override
123 public boolean compact(CompactionContext compaction, Store store,
124 CompactionThroughputController throughputController) throws IOException {
125 try {
126 return super.compact(compaction, store, throughputController);
127 } finally {
128 compactCount++;
129 }
130 }
131
132 public int countStoreFiles() {
133 int count = 0;
134 for (Store store : stores.values()) {
135 count += store.getStorefilesCount();
136 }
137 return count;
138 }
139 }
140
141
142
143
144
145 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
146
147 public BlockCompactionsInPrepRegion(Path tableDir, HLog log,
148 FileSystem fs, Configuration confParam, HRegionInfo info,
149 HTableDescriptor htd, RegionServerServices rsServices) {
150 super(tableDir, log, fs, confParam, info, htd, rsServices);
151 }
152 @Override
153 protected void doRegionCompactionPrep() throws IOException {
154 compactionsWaiting.countDown();
155 try {
156 compactionsBlocked.await();
157 } catch (InterruptedException ex) {
158 throw new IOException();
159 }
160 super.doRegionCompactionPrep();
161 }
162 }
163
164
165
166
167
168
169 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
170 public BlockCompactionsInCompletionRegion(Path tableDir, HLog log,
171 FileSystem fs, Configuration confParam, HRegionInfo info,
172 HTableDescriptor htd, RegionServerServices rsServices) {
173 super(tableDir, log, fs, confParam, info, htd, rsServices);
174 }
175 @Override
176 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
177 return new BlockCompactionsInCompletionHStore(this, family, this.conf);
178 }
179 }
180
181 public static class BlockCompactionsInCompletionHStore extends HStore {
182 CompactionBlockerRegion r;
183 protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family,
184 Configuration confParam) throws IOException {
185 super(region, family, confParam);
186 r = (CompactionBlockerRegion) region;
187 }
188
189 @Override
190 protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
191 try {
192 r.compactionsWaiting.countDown();
193 r.compactionsBlocked.await();
194 } catch (InterruptedException ex) {
195 throw new IOException(ex);
196 }
197 super.completeCompaction(compactedFiles);
198 }
199 }
200
201 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
202 private final static TableName TABLE_NAME =
203 TableName.valueOf("tabletest");
204 private final static byte[] FAMILY = Bytes.toBytes("family");
205 private static final int FIRST_BATCH_COUNT = 4000;
206 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
207
208
209
210
211
212
213
214 @Ignore("See HBASE-10298")
215 @Test
216 public void testFencingAroundCompaction() throws Exception {
217 doTest(BlockCompactionsInPrepRegion.class, false);
218 doTest(BlockCompactionsInPrepRegion.class, true);
219 }
220
221
222
223
224
225
226
227 @Ignore("See HBASE-10298")
228 @Test
229 public void testFencingAroundCompactionAfterWALSync() throws Exception {
230 doTest(BlockCompactionsInCompletionRegion.class, false);
231 doTest(BlockCompactionsInCompletionRegion.class, true);
232 }
233
234 public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception {
235 Configuration c = TEST_UTIL.getConfiguration();
236 c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
237
238 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
239 c.setBoolean("dfs.support.append", true);
240
241 c.setLong("hbase.hregion.memstore.flush.size", 200000);
242 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
243
244 c.setInt("hbase.hstore.compactionThreshold", 1000);
245 c.setLong("hbase.hstore.blockingStoreFiles", 1000);
246
247 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
248 LOG.info("Starting mini cluster");
249 TEST_UTIL.startMiniCluster(1);
250 CompactionBlockerRegion compactingRegion = null;
251 HBaseAdmin admin = null;
252 try {
253 LOG.info("Creating admin");
254 admin = new HBaseAdmin(c);
255 LOG.info("Creating table");
256 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
257 HTable table = new HTable(c, TABLE_NAME);
258 LOG.info("Loading test table");
259
260 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
261 assertEquals(1, testRegions.size());
262 compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
263 LOG.info("Blocking compactions");
264 compactingRegion.stopCompactions();
265 long lastFlushTime = compactingRegion.getLastFlushTime();
266
267 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
268
269
270
271 HRegionInfo oldHri = new HRegionInfo(table.getName(),
272 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
273 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
274 FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
275 new Path("store_dir"));
276 HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(),
277 oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
278
279
280 long startWaitTime = System.currentTimeMillis();
281 while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
282 compactingRegion.countStoreFiles() <= 1) {
283 LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString());
284 Thread.sleep(1000);
285 assertTrue("Timed out waiting for the region to flush",
286 System.currentTimeMillis() - startWaitTime < 30000);
287 }
288 assertTrue(compactingRegion.countStoreFiles() > 1);
289 final byte REGION_NAME[] = compactingRegion.getRegionName();
290 LOG.info("Asking for compaction");
291 admin.majorCompact(TABLE_NAME.getName());
292 LOG.info("Waiting for compaction to be about to start");
293 compactingRegion.waitForCompactionToBlock();
294 LOG.info("Starting a new server");
295 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
296 final HRegionServer newServer = newServerThread.getRegionServer();
297 LOG.info("Killing region server ZK lease");
298 TEST_UTIL.expireRegionServerSession(0);
299 CompactionBlockerRegion newRegion = null;
300 startWaitTime = System.currentTimeMillis();
301 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
302
303
304 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
305 @Override
306 public boolean evaluate() throws Exception {
307 HRegion newRegion = newServer.getOnlineRegion(REGION_NAME);
308 return newRegion != null && !newRegion.isRecovering();
309 }
310 });
311
312 newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
313
314 LOG.info("Allowing compaction to proceed");
315 compactingRegion.allowCompactions();
316 while (compactingRegion.compactCount == 0) {
317 Thread.sleep(1000);
318 }
319
320
321 LOG.info("Compaction finished");
322
323
324 FileSystem fs = newRegion.getFilesystem();
325 for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
326 assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
327 }
328
329
330 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
331 admin.majorCompact(TABLE_NAME.getName());
332 startWaitTime = System.currentTimeMillis();
333 while (newRegion.compactCount == 0) {
334 Thread.sleep(1000);
335 assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000);
336 }
337 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
338 } finally {
339 if (compactingRegion != null) {
340 compactingRegion.allowCompactions();
341 }
342 admin.close();
343 TEST_UTIL.shutdownMiniCluster();
344 }
345 }
346 }