1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.CellScanner;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.HColumnDescriptor;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValueUtil;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.Tag;
40 import org.apache.hadoop.hbase.client.Append;
41 import org.apache.hadoop.hbase.client.Durability;
42 import org.apache.hadoop.hbase.client.HBaseAdmin;
43 import org.apache.hadoop.hbase.client.HTable;
44 import org.apache.hadoop.hbase.client.Increment;
45 import org.apache.hadoop.hbase.client.Mutation;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.ResultScanner;
49 import org.apache.hadoop.hbase.client.Scan;
50 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
51 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
55 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
56 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.junit.After;
59 import org.junit.AfterClass;
60 import org.junit.BeforeClass;
61 import org.junit.Rule;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64 import org.junit.rules.TestName;
65
66
67
68
69 @Category(MediumTests.class)
70 public class TestTags {
71 static boolean useFilter = false;
72
73 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
74
75 @Rule
76 public final TestName TEST_NAME = new TestName();
77
78 @BeforeClass
79 public static void setUpBeforeClass() throws Exception {
80 Configuration conf = TEST_UTIL.getConfiguration();
81 conf.setInt("hfile.format.version", 3);
82 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
83 TestCoprocessorForTags.class.getName());
84 TEST_UTIL.startMiniCluster(1, 2);
85 }
86
87 @AfterClass
88 public static void tearDownAfterClass() throws Exception {
89 TEST_UTIL.shutdownMiniCluster();
90 }
91
92 @After
93 public void tearDown() {
94 useFilter = false;
95 }
96
97 @Test
98 public void testTags() throws Exception {
99 HTable table = null;
100 try {
101 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
102 byte[] fam = Bytes.toBytes("info");
103 byte[] row = Bytes.toBytes("rowa");
104
105 byte[] qual = Bytes.toBytes("qual");
106
107 byte[] row1 = Bytes.toBytes("rowb");
108
109 byte[] row2 = Bytes.toBytes("rowc");
110
111 HTableDescriptor desc = new HTableDescriptor(tableName);
112 HColumnDescriptor colDesc = new HColumnDescriptor(fam);
113 colDesc.setBlockCacheEnabled(true);
114
115 colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
116 desc.addFamily(colDesc);
117 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
118 admin.createTable(desc);
119 byte[] value = Bytes.toBytes("value");
120 table = new HTable(TEST_UTIL.getConfiguration(), tableName);
121 Put put = new Put(row);
122 put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
123 put.setAttribute("visibility", Bytes.toBytes("myTag"));
124 table.put(put);
125 admin.flush(tableName.getName());
126
127
128
129
130
131 Thread.sleep(1000);
132
133 Put put1 = new Put(row1);
134 byte[] value1 = Bytes.toBytes("1000dfsdf");
135 put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
136
137 table.put(put1);
138 admin.flush(tableName.getName());
139 Thread.sleep(1000);
140
141 Put put2 = new Put(row2);
142 byte[] value2 = Bytes.toBytes("1000dfsdf");
143 put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
144 put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
145 table.put(put2);
146 admin.flush(tableName.getName());
147 Thread.sleep(1000);
148
149 result(fam, row, qual, row2, table, value, value2, row1, value1);
150 admin.compact(tableName.getName());
151 while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
152 Thread.sleep(10);
153 }
154 result(fam, row, qual, row2, table, value, value2, row1, value1);
155 } finally {
156 if (table != null) {
157 table.close();
158 }
159 }
160 }
161
162 @Test
163 public void testFlushAndCompactionWithoutTags() throws Exception {
164 HTable table = null;
165 try {
166 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
167 byte[] fam = Bytes.toBytes("info");
168 byte[] row = Bytes.toBytes("rowa");
169
170 byte[] qual = Bytes.toBytes("qual");
171
172 byte[] row1 = Bytes.toBytes("rowb");
173
174 byte[] row2 = Bytes.toBytes("rowc");
175
176 HTableDescriptor desc = new HTableDescriptor(tableName);
177 HColumnDescriptor colDesc = new HColumnDescriptor(fam);
178 colDesc.setBlockCacheEnabled(true);
179
180 colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
181 desc.addFamily(colDesc);
182 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
183 admin.createTable(desc);
184
185 table = new HTable(TEST_UTIL.getConfiguration(), tableName);
186 Put put = new Put(row);
187 byte[] value = Bytes.toBytes("value");
188 put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
189 table.put(put);
190 admin.flush(tableName.getName());
191
192
193
194
195
196 Thread.sleep(1000);
197
198 Put put1 = new Put(row1);
199 byte[] value1 = Bytes.toBytes("1000dfsdf");
200 put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
201 table.put(put1);
202 admin.flush(tableName.getName());
203 Thread.sleep(1000);
204
205 Put put2 = new Put(row2);
206 byte[] value2 = Bytes.toBytes("1000dfsdf");
207 put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
208 table.put(put2);
209 admin.flush(tableName.getName());
210 Thread.sleep(1000);
211
212 Scan s = new Scan(row);
213 ResultScanner scanner = table.getScanner(s);
214 try {
215 Result[] next = scanner.next(3);
216 for (Result result : next) {
217 CellScanner cellScanner = result.cellScanner();
218 cellScanner.advance();
219 KeyValue current = (KeyValue) cellScanner.current();
220 assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
221 }
222 } finally {
223 if (scanner != null)
224 scanner.close();
225 }
226 admin.compact(tableName.getName());
227 while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
228 Thread.sleep(10);
229 }
230 s = new Scan(row);
231 scanner = table.getScanner(s);
232 try {
233 Result[] next = scanner.next(3);
234 for (Result result : next) {
235 CellScanner cellScanner = result.cellScanner();
236 cellScanner.advance();
237 KeyValue current = (KeyValue) cellScanner.current();
238 assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
239 }
240 } finally {
241 if (scanner != null) {
242 scanner.close();
243 }
244 }
245 } finally {
246 if (table != null) {
247 table.close();
248 }
249 }
250 }
251
252 @Test
253 public void testFlushAndCompactionwithCombinations() throws Exception {
254 HTable table = null;
255 try {
256 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
257 byte[] fam = Bytes.toBytes("info");
258 byte[] row = Bytes.toBytes("rowa");
259
260 byte[] qual = Bytes.toBytes("qual");
261
262 byte[] row1 = Bytes.toBytes("rowb");
263
264 byte[] row2 = Bytes.toBytes("rowc");
265 byte[] rowd = Bytes.toBytes("rowd");
266 byte[] rowe = Bytes.toBytes("rowe");
267
268 HTableDescriptor desc = new HTableDescriptor(tableName);
269 HColumnDescriptor colDesc = new HColumnDescriptor(fam);
270 colDesc.setBlockCacheEnabled(true);
271
272 colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
273 desc.addFamily(colDesc);
274 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
275 admin.createTable(desc);
276
277 table = new HTable(TEST_UTIL.getConfiguration(), tableName);
278 Put put = new Put(row);
279 byte[] value = Bytes.toBytes("value");
280 put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
281 int bigTagLen = Short.MAX_VALUE + 5;
282 put.setAttribute("visibility", new byte[bigTagLen]);
283 table.put(put);
284 Put put1 = new Put(row1);
285 byte[] value1 = Bytes.toBytes("1000dfsdf");
286 put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
287 table.put(put1);
288 admin.flush(tableName.getName());
289
290
291
292
293
294 Thread.sleep(1000);
295
296 put1 = new Put(row2);
297 value1 = Bytes.toBytes("1000dfsdf");
298 put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
299 table.put(put1);
300 admin.flush(tableName.getName());
301 Thread.sleep(1000);
302
303 Put put2 = new Put(rowd);
304 byte[] value2 = Bytes.toBytes("1000dfsdf");
305 put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
306 table.put(put2);
307 put2 = new Put(rowe);
308 value2 = Bytes.toBytes("1000dfsddfdf");
309 put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
310 put.setAttribute("visibility", Bytes.toBytes("ram"));
311 table.put(put2);
312 admin.flush(tableName.getName());
313 Thread.sleep(1000);
314
315 TestCoprocessorForTags.checkTagPresence = true;
316 Scan s = new Scan(row);
317 s.setCaching(1);
318 ResultScanner scanner = table.getScanner(s);
319 try {
320 Result next = null;
321 while ((next = scanner.next()) != null) {
322 CellScanner cellScanner = next.cellScanner();
323 cellScanner.advance();
324 KeyValue current = (KeyValue) cellScanner.current();
325 if (CellUtil.matchingRow(current, row)) {
326 assertEquals(1, TestCoprocessorForTags.tags.size());
327 Tag tag = TestCoprocessorForTags.tags.get(0);
328 assertEquals(bigTagLen, tag.getTagLength());
329 } else {
330 assertEquals(0, TestCoprocessorForTags.tags.size());
331 }
332 }
333 } finally {
334 if (scanner != null) {
335 scanner.close();
336 }
337 TestCoprocessorForTags.checkTagPresence = false;
338 }
339 while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
340 Thread.sleep(10);
341 }
342 TestCoprocessorForTags.checkTagPresence = true;
343 scanner = table.getScanner(s);
344 try {
345 Result next = null;
346 while ((next = scanner.next()) != null) {
347 CellScanner cellScanner = next.cellScanner();
348 cellScanner.advance();
349 KeyValue current = (KeyValue) cellScanner.current();
350 if (CellUtil.matchingRow(current, row)) {
351 assertEquals(1, TestCoprocessorForTags.tags.size());
352 Tag tag = TestCoprocessorForTags.tags.get(0);
353 assertEquals(bigTagLen, tag.getTagLength());
354 } else {
355 assertEquals(0, TestCoprocessorForTags.tags.size());
356 }
357 }
358 } finally {
359 if (scanner != null) {
360 scanner.close();
361 }
362 TestCoprocessorForTags.checkTagPresence = false;
363 }
364 } finally {
365 if (table != null) {
366 table.close();
367 }
368 }
369 }
370
371 @Test
372 public void testTagsWithAppendAndIncrement() throws Exception {
373 TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
374 byte[] f = Bytes.toBytes("f");
375 byte[] q = Bytes.toBytes("q");
376 byte[] row1 = Bytes.toBytes("r1");
377 byte[] row2 = Bytes.toBytes("r2");
378
379 HTableDescriptor desc = new HTableDescriptor(tableName);
380 HColumnDescriptor colDesc = new HColumnDescriptor(f);
381 desc.addFamily(colDesc);
382 TEST_UTIL.getHBaseAdmin().createTable(desc);
383
384 HTable table = null;
385 try {
386 table = new HTable(TEST_UTIL.getConfiguration(), tableName);
387 Put put = new Put(row1);
388 byte[] v = Bytes.toBytes(2L);
389 put.add(f, q, v);
390 put.setAttribute("visibility", Bytes.toBytes("tag1"));
391 table.put(put);
392 Increment increment = new Increment(row1);
393 increment.addColumn(f, q, 1L);
394 table.increment(increment);
395 TestCoprocessorForTags.checkTagPresence = true;
396 ResultScanner scanner = table.getScanner(new Scan());
397 Result result = scanner.next();
398 KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
399 List<Tag> tags = TestCoprocessorForTags.tags;
400 assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
401 assertEquals(1, tags.size());
402 assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
403 TestCoprocessorForTags.checkTagPresence = false;
404 TestCoprocessorForTags.tags = null;
405
406 increment = new Increment(row1);
407 increment.add(new KeyValue(row1, f, q, 1234L, v));
408 increment.setAttribute("visibility", Bytes.toBytes("tag2"));
409 table.increment(increment);
410 TestCoprocessorForTags.checkTagPresence = true;
411 scanner = table.getScanner(new Scan());
412 result = scanner.next();
413 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
414 tags = TestCoprocessorForTags.tags;
415 assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
416 assertEquals(2, tags.size());
417
418 List<String> tagValues = new ArrayList<String>();
419 for (Tag tag: tags) {
420 tagValues.add(Bytes.toString(tag.getValue()));
421 }
422 assertTrue(tagValues.contains("tag1"));
423 assertTrue(tagValues.contains("tag2"));
424 TestCoprocessorForTags.checkTagPresence = false;
425 TestCoprocessorForTags.tags = null;
426
427 put = new Put(row2);
428 v = Bytes.toBytes(2L);
429 put.add(f, q, v);
430 table.put(put);
431 increment = new Increment(row2);
432 increment.add(new KeyValue(row2, f, q, 1234L, v));
433 increment.setAttribute("visibility", Bytes.toBytes("tag2"));
434 table.increment(increment);
435 Scan scan = new Scan();
436 scan.setStartRow(row2);
437 TestCoprocessorForTags.checkTagPresence = true;
438 scanner = table.getScanner(scan);
439 result = scanner.next();
440 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
441 tags = TestCoprocessorForTags.tags;
442 assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
443 assertEquals(1, tags.size());
444 assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
445 TestCoprocessorForTags.checkTagPresence = false;
446 TestCoprocessorForTags.tags = null;
447
448
449 byte[] row3 = Bytes.toBytes("r3");
450 put = new Put(row3);
451 put.add(f, q, Bytes.toBytes("a"));
452 put.setAttribute("visibility", Bytes.toBytes("tag1"));
453 table.put(put);
454 Append append = new Append(row3);
455 append.add(f, q, Bytes.toBytes("b"));
456 table.append(append);
457 scan = new Scan();
458 scan.setStartRow(row3);
459 TestCoprocessorForTags.checkTagPresence = true;
460 scanner = table.getScanner(scan);
461 result = scanner.next();
462 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
463 tags = TestCoprocessorForTags.tags;
464 assertEquals(1, tags.size());
465 assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
466 TestCoprocessorForTags.checkTagPresence = false;
467 TestCoprocessorForTags.tags = null;
468
469 append = new Append(row3);
470 append.add(new KeyValue(row3, f, q, 1234L, v));
471 append.setAttribute("visibility", Bytes.toBytes("tag2"));
472 table.append(append);
473 TestCoprocessorForTags.checkTagPresence = true;
474 scanner = table.getScanner(scan);
475 result = scanner.next();
476 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
477 tags = TestCoprocessorForTags.tags;
478 assertEquals(2, tags.size());
479
480 tagValues.clear();
481 for (Tag tag: tags) {
482 tagValues.add(Bytes.toString(tag.getValue()));
483 }
484 assertTrue(tagValues.contains("tag1"));
485 assertTrue(tagValues.contains("tag2"));
486 TestCoprocessorForTags.checkTagPresence = false;
487 TestCoprocessorForTags.tags = null;
488
489 byte[] row4 = Bytes.toBytes("r4");
490 put = new Put(row4);
491 put.add(f, q, Bytes.toBytes("a"));
492 table.put(put);
493 append = new Append(row4);
494 append.add(new KeyValue(row4, f, q, 1234L, v));
495 append.setAttribute("visibility", Bytes.toBytes("tag2"));
496 table.append(append);
497 scan = new Scan();
498 scan.setStartRow(row4);
499 TestCoprocessorForTags.checkTagPresence = true;
500 scanner = table.getScanner(scan);
501 result = scanner.next();
502 kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
503 tags = TestCoprocessorForTags.tags;
504 assertEquals(1, tags.size());
505 assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
506 } finally {
507 TestCoprocessorForTags.checkTagPresence = false;
508 TestCoprocessorForTags.tags = null;
509 if (table != null) {
510 table.close();
511 }
512 }
513 }
514
515 private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, HTable table, byte[] value,
516 byte[] value2, byte[] row1, byte[] value1) throws IOException {
517 Scan s = new Scan(row);
518
519
520
521 s.setAttribute("visibility", Bytes.toBytes("myTag"));
522 ResultScanner scanner = null;
523 try {
524 scanner = table.getScanner(s);
525 Result next = scanner.next();
526
527 assertTrue(Bytes.equals(next.getRow(), row));
528 assertTrue(Bytes.equals(next.getValue(fam, qual), value));
529
530 Result next2 = scanner.next();
531 assertTrue(next2 != null);
532 assertTrue(Bytes.equals(next2.getRow(), row1));
533 assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
534
535 next2 = scanner.next();
536 assertTrue(next2 != null);
537 assertTrue(Bytes.equals(next2.getRow(), row2));
538 assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
539
540 } finally {
541 if (scanner != null)
542 scanner.close();
543 }
544 }
545
546 public static class TestCoprocessorForTags extends BaseRegionObserver {
547
548 public static boolean checkTagPresence = false;
549 public static List<Tag> tags = null;
550
551 @Override
552 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
553 final WALEdit edit, final Durability durability) throws IOException {
554 updateMutationAddingTags(put);
555 }
556
557 private void updateMutationAddingTags(final Mutation m) {
558 byte[] attribute = m.getAttribute("visibility");
559 byte[] cf = null;
560 List<Cell> updatedCells = new ArrayList<Cell>();
561 if (attribute != null) {
562 for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
563 for (Cell cell : edits) {
564 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
565 if (cf == null) {
566 cf = kv.getFamily();
567 }
568 Tag tag = new Tag((byte) 1, attribute);
569 List<Tag> tagList = new ArrayList<Tag>();
570 tagList.add(tag);
571
572 KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0,
573 kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(),
574 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0,
575 kv.getValueLength(), tagList);
576 ((List<Cell>) updatedCells).add(newKV);
577 }
578 }
579 m.getFamilyCellMap().remove(cf);
580
581 m.getFamilyCellMap().put(cf, updatedCells);
582 }
583 }
584
585 @Override
586 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
587 throws IOException {
588 updateMutationAddingTags(increment);
589 return super.preIncrement(e, increment);
590 }
591
592 @Override
593 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
594 throws IOException {
595 updateMutationAddingTags(append);
596 return super.preAppend(e, append);
597 }
598
599 @Override
600 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
601 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
602 if (checkTagPresence) {
603 if (results.size() > 0) {
604
605 Result result = results.get(0);
606 CellScanner cellScanner = result.cellScanner();
607 if (cellScanner.advance()) {
608 Cell cell = cellScanner.current();
609 tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
610 cell.getTagsLengthUnsigned());
611 }
612 }
613 }
614 return hasMore;
615 }
616 }
617 }