1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.encoding;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.fail;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.DataOutputStream;
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.List;
33 import java.util.Random;
34 import java.util.concurrent.ConcurrentSkipListSet;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.hbase.KeyValue;
39 import org.apache.hadoop.hbase.Tag;
40 import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec;
41 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
42 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
43 import org.apache.hadoop.hbase.io.hfile.HFileContext;
44 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
45 import org.apache.hadoop.hbase.testclassification.SmallTests;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
48 import org.junit.Assert;
49 import org.junit.Before;
50 import org.junit.Test;
51 import org.junit.experimental.categories.Category;
52 import org.junit.runner.RunWith;
53 import org.junit.runners.Parameterized;
54 import org.junit.runners.Parameterized.Parameters;
55
56
57
58
59 @RunWith(Parameterized.class)
60 @Category(SmallTests.class)
61 public class TestPrefixTreeEncoding {
62 private static final Log LOG = LogFactory.getLog(TestPrefixTreeEncoding.class);
63 private static final String CF = "EncodingTestCF";
64 private static final byte[] CF_BYTES = Bytes.toBytes(CF);
65 private static final int NUM_ROWS_PER_BATCH = 50;
66 private static final int NUM_COLS_PER_ROW = 20;
67
68 private int numBatchesWritten = 0;
69 private ConcurrentSkipListSet<KeyValue> kvset = new ConcurrentSkipListSet<KeyValue>(
70 KeyValue.COMPARATOR);
71
72 private static boolean formatRowNum = false;
73
74 @Parameters
75 public static Collection<Object[]> parameters() {
76 List<Object[]> paramList = new ArrayList<Object[]>();
77 {
78 paramList.add(new Object[] { false });
79 paramList.add(new Object[] { true });
80 }
81 return paramList;
82 }
83 private final boolean includesTag;
84 public TestPrefixTreeEncoding(boolean includesTag) {
85 this.includesTag = includesTag;
86 }
87
88 @Before
89 public void setUp() throws Exception {
90 kvset.clear();
91 formatRowNum = false;
92 }
93
94 @Test
95 public void testSeekBeforeWithFixedData() throws Exception {
96 formatRowNum = true;
97 PrefixTreeCodec encoder = new PrefixTreeCodec();
98 int batchId = numBatchesWritten++;
99 ByteBuffer dataBuffer = generateFixedTestData(kvset, batchId, false, includesTag);
100 HFileContext meta = new HFileContextBuilder()
101 .withHBaseCheckSum(false)
102 .withIncludesMvcc(false)
103 .withIncludesTags(includesTag)
104 .withCompression(Algorithm.NONE).build();
105 HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
106 DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
107 encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
108 EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
109 encoder.newDataBlockDecodingContext(meta));
110 byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
111 ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
112 onDiskBytes.length - DataBlockEncoding.ID_SIZE);
113 seeker.setCurrentBuffer(readBuffer);
114
115
116 KeyValue seekKey = KeyValue.createFirstDeleteFamilyOnRow(getRowKey(batchId, 0), CF_BYTES);
117 seeker.seekToKeyInBlock(seekKey.getBuffer(), seekKey.getKeyOffset(), seekKey.getKeyLength(),
118 true);
119 assertEquals(null, seeker.getKeyValue());
120
121
122 seekKey = KeyValue.createFirstDeleteFamilyOnRow(getRowKey(batchId, NUM_ROWS_PER_BATCH / 3),
123 CF_BYTES);
124 seeker.seekToKeyInBlock(seekKey.getBuffer(), seekKey.getKeyOffset(), seekKey.getKeyLength(),
125 true);
126 assertNotNull(seeker.getKeyValue());
127 assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH / 3 - 1), seeker.getKeyValue().getRow());
128
129
130 seekKey = KeyValue.createFirstDeleteFamilyOnRow(Bytes.toBytes("zzzz"), CF_BYTES);
131 seeker.seekToKeyInBlock(seekKey.getBuffer(), seekKey.getKeyOffset(), seekKey.getKeyLength(),
132 true);
133 assertNotNull(seeker.getKeyValue());
134 assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH - 1), seeker.getKeyValue().getRow());
135 }
136
137 @Test
138 public void testScanWithRandomData() throws Exception {
139 PrefixTreeCodec encoder = new PrefixTreeCodec();
140 ByteBuffer dataBuffer = generateRandomTestData(kvset, numBatchesWritten++, includesTag);
141 HFileContext meta = new HFileContextBuilder()
142 .withHBaseCheckSum(false)
143 .withIncludesMvcc(false)
144 .withIncludesTags(includesTag)
145 .withCompression(Algorithm.NONE)
146 .build();
147 HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
148 DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
149 encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
150 EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
151 encoder.newDataBlockDecodingContext(meta));
152 byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
153 ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
154 onDiskBytes.length - DataBlockEncoding.ID_SIZE);
155 seeker.setCurrentBuffer(readBuffer);
156 KeyValue previousKV = null;
157 do {
158 KeyValue currentKV = seeker.getKeyValue();
159 System.out.println(currentKV);
160 if (previousKV != null && KeyValue.COMPARATOR.compare(currentKV, previousKV) < 0) {
161 dumpInputKVSet();
162 fail("Current kv " + currentKV + " is smaller than previous keyvalue " + previousKV);
163 }
164 if (!includesTag) {
165 assertFalse(currentKV.getTagsLengthUnsigned() > 0);
166 } else {
167 Assert.assertTrue(currentKV.getTagsLengthUnsigned() > 0);
168 }
169 previousKV = currentKV;
170 } while (seeker.next());
171 }
172
173 @Test
174 public void testSeekWithRandomData() throws Exception {
175 PrefixTreeCodec encoder = new PrefixTreeCodec();
176 int batchId = numBatchesWritten++;
177 ByteBuffer dataBuffer = generateRandomTestData(kvset, batchId, includesTag);
178 HFileContext meta = new HFileContextBuilder()
179 .withHBaseCheckSum(false)
180 .withIncludesMvcc(false)
181 .withIncludesTags(includesTag)
182 .withCompression(Algorithm.NONE)
183 .build();
184 HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
185 DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
186 encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
187 EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
188 encoder.newDataBlockDecodingContext(meta));
189 byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
190 ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
191 onDiskBytes.length - DataBlockEncoding.ID_SIZE);
192 verifySeeking(seeker, readBuffer, batchId);
193 }
194
195 @Test
196 public void testSeekWithFixedData() throws Exception {
197 PrefixTreeCodec encoder = new PrefixTreeCodec();
198 int batchId = numBatchesWritten++;
199 ByteBuffer dataBuffer = generateFixedTestData(kvset, batchId, includesTag);
200 HFileContext meta = new HFileContextBuilder()
201 .withHBaseCheckSum(false)
202 .withIncludesMvcc(false)
203 .withIncludesTags(includesTag)
204 .withCompression(Algorithm.NONE)
205 .build();
206 HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
207 DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
208 encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
209 EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
210 encoder.newDataBlockDecodingContext(meta));
211 byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
212 ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
213 onDiskBytes.length - DataBlockEncoding.ID_SIZE);
214 verifySeeking(seeker, readBuffer, batchId);
215 }
216
217 private void verifySeeking(EncodedSeeker encodeSeeker,
218 ByteBuffer encodedData, int batchId) {
219 List<KeyValue> kvList = new ArrayList<KeyValue>();
220 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
221 kvList.clear();
222 encodeSeeker.setCurrentBuffer(encodedData);
223 KeyValue firstOnRow = KeyValue.createFirstOnRow(getRowKey(batchId, i));
224 encodeSeeker.seekToKeyInBlock(firstOnRow.getBuffer(),
225 firstOnRow.getKeyOffset(), firstOnRow.getKeyLength(), false);
226 boolean hasMoreOfEncodeScanner = encodeSeeker.next();
227 CollectionBackedScanner collectionScanner = new CollectionBackedScanner(
228 this.kvset);
229 boolean hasMoreOfCollectionScanner = collectionScanner.seek(firstOnRow);
230 if (hasMoreOfEncodeScanner != hasMoreOfCollectionScanner) {
231 dumpInputKVSet();
232 fail("Get error result after seeking " + firstOnRow);
233 }
234 if (hasMoreOfEncodeScanner) {
235 if (KeyValue.COMPARATOR.compare(encodeSeeker.getKeyValue(),
236 collectionScanner.peek()) != 0) {
237 dumpInputKVSet();
238 fail("Expected " + collectionScanner.peek() + " actual "
239 + encodeSeeker.getKeyValue() + ", after seeking " + firstOnRow);
240 }
241 }
242 }
243 }
244
245 private void dumpInputKVSet() {
246 LOG.info("Dumping input keyvalue set in error case:");
247 for (KeyValue kv : kvset) {
248 System.out.println(kv);
249 }
250 }
251
252 private static ByteBuffer generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset,
253 int batchId, boolean useTags) throws Exception {
254 return generateFixedTestData(kvset, batchId, true, useTags);
255 }
256
257 private static ByteBuffer generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset,
258 int batchId, boolean partial, boolean useTags) throws Exception {
259 ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
260 DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
261 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
262 if (partial && i / 10 % 2 == 1)
263 continue;
264 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
265 if (!useTags) {
266 KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), getValue(
267 batchId, i, j));
268 kvset.add(kv);
269 } else {
270 KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), 0l,
271 getValue(batchId, i, j), new Tag[] { new Tag((byte) 1, "metaValue1") });
272 kvset.add(kv);
273 }
274 }
275 }
276 for (KeyValue kv : kvset) {
277 userDataStream.writeInt(kv.getKeyLength());
278 userDataStream.writeInt(kv.getValueLength());
279 userDataStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
280 userDataStream.write(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
281 if (useTags) {
282 userDataStream.writeShort(kv.getTagsLengthUnsigned());
283 userDataStream.write(kv.getBuffer(), kv.getValueOffset() + kv.getValueLength()
284 + Bytes.SIZEOF_SHORT, kv.getTagsLengthUnsigned());
285 }
286 }
287 return ByteBuffer.wrap(baosInMemory.toByteArray());
288 }
289
290 private static ByteBuffer generateRandomTestData(ConcurrentSkipListSet<KeyValue> kvset,
291 int batchId, boolean useTags) throws Exception {
292 ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
293 DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
294 Random random = new Random();
295 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
296 if (random.nextInt(100) < 50)
297 continue;
298 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
299 if (random.nextInt(100) < 50)
300 continue;
301 if (!useTags) {
302 KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), getValue(
303 batchId, i, j));
304 kvset.add(kv);
305 } else {
306 KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), 0l,
307 getValue(batchId, i, j), new Tag[] { new Tag((byte) 1, "metaValue1") });
308 kvset.add(kv);
309 }
310 }
311 }
312
313 for (KeyValue kv : kvset) {
314 userDataStream.writeInt(kv.getKeyLength());
315 userDataStream.writeInt(kv.getValueLength());
316 userDataStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
317 userDataStream.write(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
318 if (useTags) {
319 userDataStream.writeShort(kv.getTagsLengthUnsigned());
320 userDataStream.write(kv.getBuffer(), kv.getValueOffset() + kv.getValueLength()
321 + Bytes.SIZEOF_SHORT, kv.getTagsLengthUnsigned());
322 }
323 }
324 return ByteBuffer.wrap(baosInMemory.toByteArray());
325 }
326
327 private static byte[] getRowKey(int batchId, int i) {
328 return Bytes
329 .toBytes("batch" + batchId + "_row" + (formatRowNum ? String.format("%04d", i) : i));
330 }
331
332 private static byte[] getQualifier(int j) {
333 return Bytes.toBytes("colfdfafhfhsdfhsdfh" + j);
334 }
335
336 private static byte[] getValue(int batchId, int i, int j) {
337 return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i)) + "_col" + j);
338 }
339
340 }