1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import static org.junit.Assert.assertTrue;
21 import static org.junit.Assert.fail;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.commons.logging.impl.Log4JLogger;
26 import org.apache.hadoop.hbase.*;
27 import org.apache.hadoop.hbase.client.HBaseAdmin;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.ipc.RpcClient;
31 import org.apache.hadoop.hbase.regionserver.HRegion;
32 import org.apache.hadoop.hbase.regionserver.HRegionServer;
33 import org.apache.hadoop.hbase.testclassification.MediumTests;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.hbase.util.FSUtils;
36 import org.apache.hadoop.hdfs.MiniDFSCluster;
37 import org.apache.hadoop.hdfs.server.datanode.DataNode;
38 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
39 import org.apache.log4j.Level;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.BeforeClass;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45
46
47
48
49
50 @Category(MediumTests.class)
51 public class TestLogRollAbort {
52 private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
53 private static MiniDFSCluster dfsCluster;
54 private static HBaseAdmin admin;
55 private static MiniHBaseCluster cluster;
56 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
57
58
59 {
60 ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
61 ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
62 ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
63 .getLogger().setLevel(Level.ALL);
64 ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
65 ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
66 ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
67 }
68
69
70
71 @BeforeClass
72 public static void setUpBeforeClass() throws Exception {
73
74 TEST_UTIL.getConfiguration().setInt(
75 "hbase.regionserver.logroll.errors.tolerated", 2);
76 TEST_UTIL.getConfiguration().setInt(RpcClient.PING_INTERVAL_NAME, 10 * 1000);
77 TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT, 10 * 1000);
78 TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
79
80
81 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 5 * 1000);
82
83
84 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
85
86
87 TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
88 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
89
90
91 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 10);
92 }
93
94 @Before
95 public void setUp() throws Exception {
96 TEST_UTIL.startMiniCluster(2);
97
98 cluster = TEST_UTIL.getHBaseCluster();
99 dfsCluster = TEST_UTIL.getDFSCluster();
100 admin = TEST_UTIL.getHBaseAdmin();
101
102
103 cluster.getMaster().balanceSwitch(false);
104 }
105
106 @After
107 public void tearDown() throws Exception {
108 TEST_UTIL.shutdownMiniCluster();
109 }
110
111
112
113
114
115 @Test
116 public void testRSAbortWithUnflushedEdits() throws Exception {
117 LOG.info("Starting testRSAbortWithUnflushedEdits()");
118
119
120 new HTable(TEST_UTIL.getConfiguration(),
121 TableName.META_TABLE_NAME).close();
122
123
124 String tableName = this.getClass().getSimpleName();
125 HTableDescriptor desc = new HTableDescriptor(tableName);
126 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
127 desc.setDeferredLogFlush(true);
128
129 admin.createTable(desc);
130 HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
131
132 HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
133 HLog log = server.getWAL();
134
135 assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
136
137 assertTrue("Need append support for this test",
138 FSUtils.isAppendSupported(TEST_UTIL.getConfiguration()));
139
140 Put p = new Put(Bytes.toBytes("row2001"));
141 p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2001));
142 table.put(p);
143
144 log.sync();
145
146 p = new Put(Bytes.toBytes("row2002"));
147 p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2002));
148 table.put(p);
149
150 dfsCluster.restartDataNodes();
151 LOG.info("Restarted datanodes");
152
153 try {
154 log.rollWriter(true);
155 } catch (FailedLogCloseException flce) {
156 assertTrue("Should have deferred flush log edits outstanding",
157 ((FSHLog) log).hasUnSyncedEntries());
158 }
159 }
160
161 }
162