1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.FileSystem;
23 import org.apache.hadoop.fs.Path;
24 import org.apache.hadoop.hbase.HBaseTestingUtility;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.HRegionInfo;
27 import org.apache.hadoop.hbase.HTableDescriptor;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.testclassification.LargeTests;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.regionserver.wal.HLog;
32 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
33 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
34 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
35 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hdfs.MiniDFSCluster;
38 import org.junit.After;
39 import org.junit.AfterClass;
40 import org.junit.Before;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.junit.runner.RunWith;
45 import org.junit.runners.Parameterized;
46 import org.junit.runners.Parameterized.Parameters;
47
48 import static org.junit.Assert.*;
49
50 import java.io.IOException;
51 import java.util.ArrayList;
52 import java.util.Collection;
53 import java.util.List;
54 import java.util.concurrent.atomic.AtomicLong;
55
56 @Category(LargeTests.class)
57 @RunWith(Parameterized.class)
58 public class TestReplicationHLogReaderManager {
59
60 private static HBaseTestingUtility TEST_UTIL;
61 private static Configuration conf;
62 private static Path hbaseDir;
63 private static FileSystem fs;
64 private static MiniDFSCluster cluster;
65 private static final TableName tableName = TableName.valueOf("tablename");
66 private static final byte [] family = Bytes.toBytes("column");
67 private static final byte [] qualifier = Bytes.toBytes("qualifier");
68 private static final HRegionInfo info = new HRegionInfo(tableName,
69 HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
70 private static final HTableDescriptor htd = new HTableDescriptor(tableName);
71
72 private HLog log;
73 private ReplicationHLogReaderManager logManager;
74 private PathWatcher pathWatcher;
75 private int nbRows;
76 private int walEditKVs;
77 private final AtomicLong sequenceId = new AtomicLong(1);
78
79 @Parameters
80 public static Collection<Object[]> parameters() {
81
82 int[] NB_ROWS = { 1500, 60000 };
83 int[] NB_KVS = { 1, 100 };
84
85 Boolean[] BOOL_VALS = { false, true };
86 List<Object[]> parameters = new ArrayList<Object[]>();
87 for (int nbRows : NB_ROWS) {
88 for (int walEditKVs : NB_KVS) {
89 for (boolean b : BOOL_VALS) {
90 Object[] arr = new Object[3];
91 arr[0] = nbRows;
92 arr[1] = walEditKVs;
93 arr[2] = b;
94 parameters.add(arr);
95 }
96 }
97 }
98 return parameters;
99 }
100
101 public TestReplicationHLogReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
102 this.nbRows = nbRows;
103 this.walEditKVs = walEditKVs;
104 TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
105 enableCompression);
106 }
107
108 @BeforeClass
109 public static void setUpBeforeClass() throws Exception {
110 TEST_UTIL = new HBaseTestingUtility();
111 conf = TEST_UTIL.getConfiguration();
112 TEST_UTIL.startMiniDFSCluster(3);
113
114 hbaseDir = TEST_UTIL.createRootDir();
115 cluster = TEST_UTIL.getDFSCluster();
116 fs = cluster.getFileSystem();
117 }
118
119 @AfterClass
120 public static void tearDownAfterClass() throws Exception {
121 TEST_UTIL.shutdownMiniCluster();
122 }
123
124 @Before
125 public void setUp() throws Exception {
126 logManager = new ReplicationHLogReaderManager(fs, conf);
127 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
128 pathWatcher = new PathWatcher();
129 listeners.add(pathWatcher);
130 log = HLogFactory.createHLog(fs, hbaseDir, "test", conf, listeners, "some server");
131 }
132
133 @After
134 public void tearDown() throws Exception {
135 log.closeAndDelete();
136 }
137
138 @Test
139 public void test() throws Exception {
140
141 Path path = pathWatcher.currentPath;
142
143 assertEquals(0, logManager.getPosition());
144
145 appendToLog();
146
147
148 assertNotNull(logManager.openReader(path));
149 logManager.seek();
150 HLog.Entry entry = logManager.readNextAndSetPosition();
151 assertNotNull(entry);
152 entry = logManager.readNextAndSetPosition();
153 assertNull(entry);
154 logManager.closeReader();
155 long oldPos = logManager.getPosition();
156
157 appendToLog();
158
159
160 assertNotNull(logManager.openReader(path));
161 logManager.seek();
162 entry = logManager.readNextAndSetPosition();
163 assertNotEquals(oldPos, logManager.getPosition());
164 assertNotNull(entry);
165 logManager.closeReader();
166 oldPos = logManager.getPosition();
167
168 log.rollWriter();
169
170
171 assertNotNull(logManager.openReader(path));
172 logManager.seek();
173 entry = logManager.readNextAndSetPosition();
174 assertEquals(oldPos, logManager.getPosition());
175 assertNull(entry);
176 logManager.finishCurrentFile();
177
178 path = pathWatcher.currentPath;
179
180 for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
181 log.rollWriter();
182 logManager.openReader(path);
183 logManager.seek();
184 for (int i = 0; i < nbRows; i++) {
185 HLog.Entry e = logManager.readNextAndSetPosition();
186 if (e == null) {
187 fail("Should have enough entries");
188 }
189 }
190 }
191
192 private void appendToLog() throws IOException {
193 appendToLogPlus(1);
194 }
195
196 private void appendToLogPlus(int count) throws IOException {
197 log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd, sequenceId);
198 }
199
200 private WALEdit getWALEdits(int count) {
201 WALEdit edit = new WALEdit();
202 for (int i = 0; i < count; i++) {
203 edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
204 System.currentTimeMillis(), qualifier));
205 }
206 return edit;
207 }
208
209 class PathWatcher implements WALActionsListener {
210
211 Path currentPath;
212
213 @Override
214 public void preLogRoll(Path oldPath, Path newPath) throws IOException {
215 currentPath = newPath;
216 }
217
218 @Override
219 public void postLogRoll(Path oldPath, Path newPath) throws IOException {}
220
221 @Override
222 public void preLogArchive(Path oldPath, Path newPath) throws IOException {}
223
224 @Override
225 public void postLogArchive(Path oldPath, Path newPath) throws IOException {}
226
227 @Override
228 public void logRollRequested(boolean tooFewReplicas) {}
229
230 @Override
231 public void logCloseRequested() {}
232
233 @Override
234 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {}
235
236 @Override
237 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {}
238 }
239 }