View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.testclassification.MediumTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.Tag;
40  import org.apache.hadoop.hbase.client.Append;
41  import org.apache.hadoop.hbase.client.Durability;
42  import org.apache.hadoop.hbase.client.HBaseAdmin;
43  import org.apache.hadoop.hbase.client.HTable;
44  import org.apache.hadoop.hbase.client.Increment;
45  import org.apache.hadoop.hbase.client.Mutation;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.ResultScanner;
49  import org.apache.hadoop.hbase.client.Scan;
50  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
51  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
55  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
56  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57  import org.apache.hadoop.hbase.util.Bytes;
58  import org.junit.After;
59  import org.junit.AfterClass;
60  import org.junit.BeforeClass;
61  import org.junit.Rule;
62  import org.junit.Test;
63  import org.junit.experimental.categories.Category;
64  import org.junit.rules.TestName;
65  
66  /**
67   * Class that test tags
68   */
69  @Category(MediumTests.class)
70  public class TestTags {
71    static boolean useFilter = false;
72  
73    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
74  
75    @Rule
76    public final TestName TEST_NAME = new TestName();
77  
78    @BeforeClass
79    public static void setUpBeforeClass() throws Exception {
80      Configuration conf = TEST_UTIL.getConfiguration();
81      conf.setInt("hfile.format.version", 3);
82      conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
83          TestCoprocessorForTags.class.getName());
84      TEST_UTIL.startMiniCluster(1, 2);
85    }
86  
87    @AfterClass
88    public static void tearDownAfterClass() throws Exception {
89      TEST_UTIL.shutdownMiniCluster();
90    }
91  
92    @After
93    public void tearDown() {
94      useFilter = false;
95    }
96  
97    @Test
98    public void testTags() throws Exception {
99      HTable table = null;
100     try {
101       TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
102       byte[] fam = Bytes.toBytes("info");
103       byte[] row = Bytes.toBytes("rowa");
104       // column names
105       byte[] qual = Bytes.toBytes("qual");
106 
107       byte[] row1 = Bytes.toBytes("rowb");
108 
109       byte[] row2 = Bytes.toBytes("rowc");
110 
111       HTableDescriptor desc = new HTableDescriptor(tableName);
112       HColumnDescriptor colDesc = new HColumnDescriptor(fam);
113       colDesc.setBlockCacheEnabled(true);
114       // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
115       colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
116       desc.addFamily(colDesc);
117       HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
118       admin.createTable(desc);
119       byte[] value = Bytes.toBytes("value");
120       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
121       Put put = new Put(row);
122       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
123       put.setAttribute("visibility", Bytes.toBytes("myTag"));
124       table.put(put);
125       admin.flush(tableName.getName());
126       // We are lacking an API for confirming flush request compaction.
127       // Just sleep for a short time. We won't be able to confirm flush
128       // completion but the test won't hang now or in the future if
129       // default compaction policy causes compaction between flush and
130       // when we go to confirm it.
131       Thread.sleep(1000);
132 
133       Put put1 = new Put(row1);
134       byte[] value1 = Bytes.toBytes("1000dfsdf");
135       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
136       // put1.setAttribute("visibility", Bytes.toBytes("myTag3"));
137       table.put(put1);
138       admin.flush(tableName.getName());
139       Thread.sleep(1000);
140 
141       Put put2 = new Put(row2);
142       byte[] value2 = Bytes.toBytes("1000dfsdf");
143       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
144       put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
145       table.put(put2);
146       admin.flush(tableName.getName());
147       Thread.sleep(1000);
148 
149       result(fam, row, qual, row2, table, value, value2, row1, value1);
150       admin.compact(tableName.getName());
151       while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
152         Thread.sleep(10);
153       }
154       result(fam, row, qual, row2, table, value, value2, row1, value1);
155     } finally {
156       if (table != null) {
157         table.close();
158       }
159     }
160   }
161 
162   @Test
163   public void testFlushAndCompactionWithoutTags() throws Exception {
164     HTable table = null;
165     try {
166       TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
167       byte[] fam = Bytes.toBytes("info");
168       byte[] row = Bytes.toBytes("rowa");
169       // column names
170       byte[] qual = Bytes.toBytes("qual");
171 
172       byte[] row1 = Bytes.toBytes("rowb");
173 
174       byte[] row2 = Bytes.toBytes("rowc");
175 
176       HTableDescriptor desc = new HTableDescriptor(tableName);
177       HColumnDescriptor colDesc = new HColumnDescriptor(fam);
178       colDesc.setBlockCacheEnabled(true);
179       // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
180       colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
181       desc.addFamily(colDesc);
182       HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
183       admin.createTable(desc);
184 
185       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
186       Put put = new Put(row);
187       byte[] value = Bytes.toBytes("value");
188       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
189       table.put(put);
190       admin.flush(tableName.getName());
191       // We are lacking an API for confirming flush request compaction.
192       // Just sleep for a short time. We won't be able to confirm flush
193       // completion but the test won't hang now or in the future if
194       // default compaction policy causes compaction between flush and
195       // when we go to confirm it.
196       Thread.sleep(1000);
197 
198       Put put1 = new Put(row1);
199       byte[] value1 = Bytes.toBytes("1000dfsdf");
200       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
201       table.put(put1);
202       admin.flush(tableName.getName());
203       Thread.sleep(1000);
204 
205       Put put2 = new Put(row2);
206       byte[] value2 = Bytes.toBytes("1000dfsdf");
207       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
208       table.put(put2);
209       admin.flush(tableName.getName());
210       Thread.sleep(1000);
211 
212       Scan s = new Scan(row);
213       ResultScanner scanner = table.getScanner(s);
214       try {
215         Result[] next = scanner.next(3);
216         for (Result result : next) {
217           CellScanner cellScanner = result.cellScanner();
218           cellScanner.advance();
219           KeyValue current = (KeyValue) cellScanner.current();
220           assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
221         }
222       } finally {
223         if (scanner != null)
224           scanner.close();
225       }
226       admin.compact(tableName.getName());
227       while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
228         Thread.sleep(10);
229       }
230       s = new Scan(row);
231       scanner = table.getScanner(s);
232       try {
233         Result[] next = scanner.next(3);
234         for (Result result : next) {
235           CellScanner cellScanner = result.cellScanner();
236           cellScanner.advance();
237           KeyValue current = (KeyValue) cellScanner.current();
238           assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
239         }
240       } finally {
241         if (scanner != null) {
242           scanner.close();
243         }
244       }
245     } finally {
246       if (table != null) {
247         table.close();
248       }
249     }
250   }
251 
252   @Test
253   public void testFlushAndCompactionwithCombinations() throws Exception {
254     HTable table = null;
255     try {
256       TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
257       byte[] fam = Bytes.toBytes("info");
258       byte[] row = Bytes.toBytes("rowa");
259       // column names
260       byte[] qual = Bytes.toBytes("qual");
261 
262       byte[] row1 = Bytes.toBytes("rowb");
263 
264       byte[] row2 = Bytes.toBytes("rowc");
265       byte[] rowd = Bytes.toBytes("rowd");
266       byte[] rowe = Bytes.toBytes("rowe");
267 
268       HTableDescriptor desc = new HTableDescriptor(tableName);
269       HColumnDescriptor colDesc = new HColumnDescriptor(fam);
270       colDesc.setBlockCacheEnabled(true);
271       // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
272       colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
273       desc.addFamily(colDesc);
274       HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
275       admin.createTable(desc);
276 
277       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
278       Put put = new Put(row);
279       byte[] value = Bytes.toBytes("value");
280       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
281       int bigTagLen = Short.MAX_VALUE + 5;
282       put.setAttribute("visibility", new byte[bigTagLen]);
283       table.put(put);
284       Put put1 = new Put(row1);
285       byte[] value1 = Bytes.toBytes("1000dfsdf");
286       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
287       table.put(put1);
288       admin.flush(tableName.getName());
289       // We are lacking an API for confirming flush request compaction.
290       // Just sleep for a short time. We won't be able to confirm flush
291       // completion but the test won't hang now or in the future if
292       // default compaction policy causes compaction between flush and
293       // when we go to confirm it.
294       Thread.sleep(1000);
295 
296       put1 = new Put(row2);
297       value1 = Bytes.toBytes("1000dfsdf");
298       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
299       table.put(put1);
300       admin.flush(tableName.getName());
301       Thread.sleep(1000);
302 
303       Put put2 = new Put(rowd);
304       byte[] value2 = Bytes.toBytes("1000dfsdf");
305       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
306       table.put(put2);
307       put2 = new Put(rowe);
308       value2 = Bytes.toBytes("1000dfsddfdf");
309       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
310       put.setAttribute("visibility", Bytes.toBytes("ram"));
311       table.put(put2);
312       admin.flush(tableName.getName());
313       Thread.sleep(1000);
314 
315       TestCoprocessorForTags.checkTagPresence = true;
316       Scan s = new Scan(row);
317       s.setCaching(1);
318       ResultScanner scanner = table.getScanner(s);
319       try {
320         Result next = null;
321         while ((next = scanner.next()) != null) {
322           CellScanner cellScanner = next.cellScanner();
323           cellScanner.advance();
324           KeyValue current = (KeyValue) cellScanner.current();
325           if (CellUtil.matchingRow(current, row)) {
326             assertEquals(1, TestCoprocessorForTags.tags.size());
327             Tag tag = TestCoprocessorForTags.tags.get(0);
328             assertEquals(bigTagLen, tag.getTagLength());
329           } else {
330             assertEquals(0, TestCoprocessorForTags.tags.size());
331           }
332         }
333       } finally {
334         if (scanner != null) {
335           scanner.close();
336         }
337         TestCoprocessorForTags.checkTagPresence = false;
338       }
339       while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
340         Thread.sleep(10);
341       }
342       TestCoprocessorForTags.checkTagPresence = true;
343       scanner = table.getScanner(s);
344       try {
345         Result next = null;
346         while ((next = scanner.next()) != null) {
347           CellScanner cellScanner = next.cellScanner();
348           cellScanner.advance();
349           KeyValue current = (KeyValue) cellScanner.current();
350           if (CellUtil.matchingRow(current, row)) {
351             assertEquals(1, TestCoprocessorForTags.tags.size());
352             Tag tag = TestCoprocessorForTags.tags.get(0);
353             assertEquals(bigTagLen, tag.getTagLength());
354           } else {
355             assertEquals(0, TestCoprocessorForTags.tags.size());
356           }
357         }
358       } finally {
359         if (scanner != null) {
360           scanner.close();
361         }
362         TestCoprocessorForTags.checkTagPresence = false;
363       }
364     } finally {
365       if (table != null) {
366         table.close();
367       }
368     }
369   }
370 
371   @Test
372   public void testTagsWithAppendAndIncrement() throws Exception {
373     TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
374     byte[] f = Bytes.toBytes("f");
375     byte[] q = Bytes.toBytes("q");
376     byte[] row1 = Bytes.toBytes("r1");
377     byte[] row2 = Bytes.toBytes("r2");
378 
379     HTableDescriptor desc = new HTableDescriptor(tableName);
380     HColumnDescriptor colDesc = new HColumnDescriptor(f);
381     desc.addFamily(colDesc);
382     TEST_UTIL.getHBaseAdmin().createTable(desc);
383 
384     HTable table = null;
385     try {
386       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
387       Put put = new Put(row1);
388       byte[] v = Bytes.toBytes(2L);
389       put.add(f, q, v);
390       put.setAttribute("visibility", Bytes.toBytes("tag1"));
391       table.put(put);
392       Increment increment = new Increment(row1);
393       increment.addColumn(f, q, 1L);
394       table.increment(increment);
395       TestCoprocessorForTags.checkTagPresence = true;
396       ResultScanner scanner = table.getScanner(new Scan());
397       Result result = scanner.next();
398       KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
399       List<Tag> tags = TestCoprocessorForTags.tags;
400       assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
401       assertEquals(1, tags.size());
402       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
403       TestCoprocessorForTags.checkTagPresence = false;
404       TestCoprocessorForTags.tags = null;
405 
406       increment = new Increment(row1);
407       increment.add(new KeyValue(row1, f, q, 1234L, v));
408       increment.setAttribute("visibility", Bytes.toBytes("tag2"));
409       table.increment(increment);
410       TestCoprocessorForTags.checkTagPresence = true;
411       scanner = table.getScanner(new Scan());
412       result = scanner.next();
413       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
414       tags = TestCoprocessorForTags.tags;
415       assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
416       assertEquals(2, tags.size());
417       // We cannot assume the ordering of tags
418       List<String> tagValues = new ArrayList<String>();
419       for (Tag tag: tags) {
420         tagValues.add(Bytes.toString(tag.getValue()));
421       }
422       assertTrue(tagValues.contains("tag1"));
423       assertTrue(tagValues.contains("tag2"));
424       TestCoprocessorForTags.checkTagPresence = false;
425       TestCoprocessorForTags.tags = null;
426 
427       put = new Put(row2);
428       v = Bytes.toBytes(2L);
429       put.add(f, q, v);
430       table.put(put);
431       increment = new Increment(row2);
432       increment.add(new KeyValue(row2, f, q, 1234L, v));
433       increment.setAttribute("visibility", Bytes.toBytes("tag2"));
434       table.increment(increment);
435       Scan scan = new Scan();
436       scan.setStartRow(row2);
437       TestCoprocessorForTags.checkTagPresence = true;
438       scanner = table.getScanner(scan);
439       result = scanner.next();
440       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
441       tags = TestCoprocessorForTags.tags;
442       assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
443       assertEquals(1, tags.size());
444       assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
445       TestCoprocessorForTags.checkTagPresence = false;
446       TestCoprocessorForTags.tags = null;
447 
448       // Test Append
449       byte[] row3 = Bytes.toBytes("r3");
450       put = new Put(row3);
451       put.add(f, q, Bytes.toBytes("a"));
452       put.setAttribute("visibility", Bytes.toBytes("tag1"));
453       table.put(put);
454       Append append = new Append(row3);
455       append.add(f, q, Bytes.toBytes("b"));
456       table.append(append);
457       scan = new Scan();
458       scan.setStartRow(row3);
459       TestCoprocessorForTags.checkTagPresence = true;
460       scanner = table.getScanner(scan);
461       result = scanner.next();
462       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
463       tags = TestCoprocessorForTags.tags;
464       assertEquals(1, tags.size());
465       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
466       TestCoprocessorForTags.checkTagPresence = false;
467       TestCoprocessorForTags.tags = null;
468 
469       append = new Append(row3);
470       append.add(new KeyValue(row3, f, q, 1234L, v));
471       append.setAttribute("visibility", Bytes.toBytes("tag2"));
472       table.append(append);
473       TestCoprocessorForTags.checkTagPresence = true;
474       scanner = table.getScanner(scan);
475       result = scanner.next();
476       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
477       tags = TestCoprocessorForTags.tags;
478       assertEquals(2, tags.size());
479       // We cannot assume the ordering of tags
480       tagValues.clear();
481       for (Tag tag: tags) {
482         tagValues.add(Bytes.toString(tag.getValue()));
483       }
484       assertTrue(tagValues.contains("tag1"));
485       assertTrue(tagValues.contains("tag2"));
486       TestCoprocessorForTags.checkTagPresence = false;
487       TestCoprocessorForTags.tags = null;
488 
489       byte[] row4 = Bytes.toBytes("r4");
490       put = new Put(row4);
491       put.add(f, q, Bytes.toBytes("a"));
492       table.put(put);
493       append = new Append(row4);
494       append.add(new KeyValue(row4, f, q, 1234L, v));
495       append.setAttribute("visibility", Bytes.toBytes("tag2"));
496       table.append(append);
497       scan = new Scan();
498       scan.setStartRow(row4);
499       TestCoprocessorForTags.checkTagPresence = true;
500       scanner = table.getScanner(scan);
501       result = scanner.next();
502       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
503       tags = TestCoprocessorForTags.tags;
504       assertEquals(1, tags.size());
505       assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
506     } finally {
507       TestCoprocessorForTags.checkTagPresence = false;
508       TestCoprocessorForTags.tags = null;
509       if (table != null) {
510         table.close();
511       }
512     }
513   }
514 
515   private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, HTable table, byte[] value,
516       byte[] value2, byte[] row1, byte[] value1) throws IOException {
517     Scan s = new Scan(row);
518     // If filters are used this attribute can be specifically check for in
519     // filterKV method and
520     // kvs can be filtered out if the tags of interest is not found in that kv
521     s.setAttribute("visibility", Bytes.toBytes("myTag"));
522     ResultScanner scanner = null;
523     try {
524       scanner = table.getScanner(s);
525       Result next = scanner.next();
526 
527       assertTrue(Bytes.equals(next.getRow(), row));
528       assertTrue(Bytes.equals(next.getValue(fam, qual), value));
529 
530       Result next2 = scanner.next();
531       assertTrue(next2 != null);
532       assertTrue(Bytes.equals(next2.getRow(), row1));
533       assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
534 
535       next2 = scanner.next();
536       assertTrue(next2 != null);
537       assertTrue(Bytes.equals(next2.getRow(), row2));
538       assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
539 
540     } finally {
541       if (scanner != null)
542         scanner.close();
543     }
544   }
545 
546   public static class TestCoprocessorForTags extends BaseRegionObserver {
547 
548     public static boolean checkTagPresence = false;
549     public static List<Tag> tags = null;
550 
551     @Override
552     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
553         final WALEdit edit, final Durability durability) throws IOException {
554       updateMutationAddingTags(put);
555     }
556 
557     private void updateMutationAddingTags(final Mutation m) {
558       byte[] attribute = m.getAttribute("visibility");
559       byte[] cf = null;
560       List<Cell> updatedCells = new ArrayList<Cell>();
561       if (attribute != null) {
562         for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
563           for (Cell cell : edits) {
564             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
565             if (cf == null) {
566               cf = kv.getFamily();
567             }
568             Tag tag = new Tag((byte) 1, attribute);
569             List<Tag> tagList = new ArrayList<Tag>();
570             tagList.add(tag);
571 
572             KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0,
573                 kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(),
574                 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0,
575                 kv.getValueLength(), tagList);
576             ((List<Cell>) updatedCells).add(newKV);
577           }
578         }
579         m.getFamilyCellMap().remove(cf);
580         // Update the family map
581         m.getFamilyCellMap().put(cf, updatedCells);
582       }
583     }
584 
585     @Override
586     public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
587         throws IOException {
588       updateMutationAddingTags(increment);
589       return super.preIncrement(e, increment);
590     }
591 
592     @Override
593     public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
594         throws IOException {
595       updateMutationAddingTags(append);
596       return super.preAppend(e, append);
597     }
598 
599     @Override
600     public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
601         InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
602       if (checkTagPresence) {
603         if (results.size() > 0) {
604           // Check tag presence in the 1st cell in 1st Result
605           Result result = results.get(0);
606           CellScanner cellScanner = result.cellScanner();
607           if (cellScanner.advance()) {
608             Cell cell = cellScanner.current();
609             tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
610                 cell.getTagsLengthUnsigned());
611           }
612         }
613       }
614       return hasMore;
615     }
616   }
617 }