1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23
24 import java.util.List;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.HBaseTestingUtility;
33 import org.apache.hadoop.hbase.HColumnDescriptor;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.testclassification.MediumTests;
39 import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader;
40 import org.apache.hadoop.hbase.regionserver.wal.HLog;
41 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
42 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.mapreduce.InputSplit;
45 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
46 import org.junit.AfterClass;
47 import org.junit.Before;
48 import org.junit.BeforeClass;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51
52
53
54
55 @Category(MediumTests.class)
56 public class TestHLogRecordReader {
57 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
58 private static Configuration conf;
59 private static FileSystem fs;
60 private static Path hbaseDir;
61 private static final TableName tableName =
62 TableName.valueOf(getName());
63 private static final byte [] rowName = tableName.getName();
64 private static final HRegionInfo info = new HRegionInfo(tableName,
65 Bytes.toBytes(""), Bytes.toBytes(""), false);
66 private static final byte [] family = Bytes.toBytes("column");
67 private static final byte [] value = Bytes.toBytes("value");
68 private static HTableDescriptor htd;
69 private static Path logDir;
70 private static String logName;
71
72 private static String getName() {
73 return "TestHLogRecordReader";
74 }
75
76 @Before
77 public void setUp() throws Exception {
78 FileStatus[] entries = fs.listStatus(hbaseDir);
79 for (FileStatus dir : entries) {
80 fs.delete(dir.getPath(), true);
81 }
82
83 }
84 @BeforeClass
85 public static void setUpBeforeClass() throws Exception {
86
87 conf = TEST_UTIL.getConfiguration();
88 conf.setInt("dfs.blocksize", 1024 * 1024);
89 conf.setInt("dfs.replication", 1);
90 TEST_UTIL.startMiniDFSCluster(1);
91
92 conf = TEST_UTIL.getConfiguration();
93 fs = TEST_UTIL.getDFSCluster().getFileSystem();
94
95 hbaseDir = TEST_UTIL.createRootDir();
96
97 logName = HConstants.HREGION_LOGDIR_NAME;
98 logDir = new Path(hbaseDir, logName);
99
100 htd = new HTableDescriptor(tableName);
101 htd.addFamily(new HColumnDescriptor(family));
102 }
103
104 @AfterClass
105 public static void tearDownAfterClass() throws Exception {
106 TEST_UTIL.shutdownMiniCluster();
107 }
108
109
110
111
112
113 @Test
114 public void testPartialRead() throws Exception {
115 HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
116 long ts = System.currentTimeMillis();
117 WALEdit edit = new WALEdit();
118 final AtomicLong sequenceId = new AtomicLong(0);
119 edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
120 log.append(info, tableName, edit, ts, htd, sequenceId);
121 edit = new WALEdit();
122 edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
123 log.append(info, tableName, edit, ts+1, htd, sequenceId);
124 log.rollWriter();
125
126 Thread.sleep(1);
127 long ts1 = System.currentTimeMillis();
128
129 edit = new WALEdit();
130 edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
131 log.append(info, tableName, edit, ts1+1, htd, sequenceId);
132 edit = new WALEdit();
133 edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
134 log.append(info, tableName, edit, ts1+2, htd, sequenceId);
135 log.close();
136
137 HLogInputFormat input = new HLogInputFormat();
138 Configuration jobConf = new Configuration(conf);
139 jobConf.set("mapred.input.dir", logDir.toString());
140 jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts);
141
142
143 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
144 assertEquals(1, splits.size());
145 testSplit(splits.get(0), Bytes.toBytes("1"));
146
147 jobConf.setLong(HLogInputFormat.START_TIME_KEY, ts+1);
148 jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts1+1);
149 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
150
151 assertEquals(2, splits.size());
152
153 testSplit(splits.get(0), Bytes.toBytes("2"));
154
155 testSplit(splits.get(1), Bytes.toBytes("3"));
156 }
157
158
159
160
161
162 @Test
163 public void testHLogRecordReader() throws Exception {
164 HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
165 byte [] value = Bytes.toBytes("value");
166 final AtomicLong sequenceId = new AtomicLong(0);
167 WALEdit edit = new WALEdit();
168 edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
169 System.currentTimeMillis(), value));
170 log.append(info, tableName, edit,
171 System.currentTimeMillis(), htd, sequenceId);
172
173 Thread.sleep(1);
174 long secondTs = System.currentTimeMillis();
175 log.rollWriter();
176
177 edit = new WALEdit();
178 edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
179 System.currentTimeMillis(), value));
180 log.append(info, tableName, edit,
181 System.currentTimeMillis(), htd, sequenceId);
182 log.close();
183 long thirdTs = System.currentTimeMillis();
184
185
186 HLogInputFormat input = new HLogInputFormat();
187 Configuration jobConf = new Configuration(conf);
188 jobConf.set("mapred.input.dir", logDir.toString());
189
190
191 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
192 assertEquals(2, splits.size());
193
194
195 testSplit(splits.get(0), Bytes.toBytes("1"));
196
197 testSplit(splits.get(1), Bytes.toBytes("2"));
198
199
200
201
202 jobConf.setLong(HLogInputFormat.END_TIME_KEY, secondTs-1);
203 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
204 assertEquals(1, splits.size());
205 testSplit(splits.get(0), Bytes.toBytes("1"));
206
207
208 jobConf.setLong(HLogInputFormat.END_TIME_KEY, Long.MAX_VALUE);
209 jobConf.setLong(HLogInputFormat.START_TIME_KEY, thirdTs);
210 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
211
212 assertEquals(2, splits.size());
213
214 testSplit(splits.get(0));
215 testSplit(splits.get(1));
216 }
217
218
219
220
221 private void testSplit(InputSplit split, byte[]... columns) throws Exception {
222 HLogRecordReader reader = new HLogRecordReader();
223 reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
224
225 for (byte[] column : columns) {
226 assertTrue(reader.nextKeyValue());
227 KeyValue kv = reader.getCurrentValue().getKeyValues().get(0);
228 if (!Bytes.equals(column, kv.getQualifier())) {
229 assertTrue("expected [" + Bytes.toString(column) + "], actual ["
230 + Bytes.toString(kv.getQualifier()) + "]", false);
231 }
232 }
233 assertFalse(reader.nextKeyValue());
234 reader.close();
235 }
236
237 }