View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util;
19  // this is deliberately not in the o.a.h.h.regionserver package
20  // in order to make sure all required classes/method are available
21  
22  import static org.junit.Assert.assertEquals;
23  
24  import java.io.IOException;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableSet;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.HBaseTestingUtility;
38  import org.apache.hadoop.hbase.HColumnDescriptor;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HTableDescriptor;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.testclassification.MediumTests;
44  import org.apache.hadoop.hbase.client.Get;
45  import org.apache.hadoop.hbase.client.HTable;
46  import org.apache.hadoop.hbase.client.IsolationLevel;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.Result;
49  import org.apache.hadoop.hbase.client.Scan;
50  import org.apache.hadoop.hbase.client.Durability;
51  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
52  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
53  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
54  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
55  import org.apache.hadoop.hbase.regionserver.HStore;
56  import org.apache.hadoop.hbase.regionserver.InternalScanner;
57  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
58  import org.apache.hadoop.hbase.regionserver.ScanType;
59  import org.apache.hadoop.hbase.regionserver.Store;
60  import org.apache.hadoop.hbase.regionserver.ScanInfo;
61  import org.apache.hadoop.hbase.regionserver.StoreScanner;
62  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
63  import org.junit.AfterClass;
64  import org.junit.BeforeClass;
65  import org.junit.Test;
66  import org.junit.experimental.categories.Category;
67  
68  import org.junit.runner.RunWith;
69  import org.junit.runners.Parameterized;
70  import org.junit.runners.Parameterized.Parameters;
71  
72  @Category(MediumTests.class)
73  @RunWith(Parameterized.class)
74  public class TestCoprocessorScanPolicy {
75    final Log LOG = LogFactory.getLog(getClass());
76    protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
77    private static final byte[] F = Bytes.toBytes("fam");
78    private static final byte[] Q = Bytes.toBytes("qual");
79    private static final byte[] R = Bytes.toBytes("row");
80  
81    @BeforeClass
82    public static void setUpBeforeClass() throws Exception {
83      Configuration conf = TEST_UTIL.getConfiguration();
84      conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
85          ScanObserver.class.getName());
86      TEST_UTIL.startMiniCluster();
87    }
88  
89    @AfterClass
90    public static void tearDownAfterClass() throws Exception {
91      TEST_UTIL.shutdownMiniCluster();
92    }
93  
94    @Parameters
95    public static Collection<Object[]> parameters() {
96      return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
97    }
98  
99    public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
100     TEST_UTIL.getMiniHBaseCluster().getConf()
101         .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
102   }
103 
104   @Test
105   public void testBaseCases() throws Exception {
106     TableName tableName =
107         TableName.valueOf("baseCases");
108     if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
109       TEST_UTIL.deleteTable(tableName);
110     }
111     HTable t = TEST_UTIL.createTable(tableName, F, 1);
112     // set the version override to 2
113     Put p = new Put(R);
114     p.setAttribute("versions", new byte[]{});
115     p.add(F, tableName.getName(), Bytes.toBytes(2));
116     t.put(p);
117 
118     long now = EnvironmentEdgeManager.currentTimeMillis();
119 
120     // insert 2 versions
121     p = new Put(R);
122     p.add(F, Q, now, Q);
123     t.put(p);
124     p = new Put(R);
125     p.add(F, Q, now+1, Q);
126     t.put(p);
127     Get g = new Get(R);
128     g.setMaxVersions(10);
129     Result r = t.get(g);
130     assertEquals(2, r.size());
131 
132     TEST_UTIL.flush(tableName);
133     TEST_UTIL.compact(tableName, true);
134 
135     // both version are still visible even after a flush/compaction
136     g = new Get(R);
137     g.setMaxVersions(10);
138     r = t.get(g);
139     assertEquals(2, r.size());
140 
141     // insert a 3rd version
142     p = new Put(R);
143     p.add(F, Q, now+2, Q);
144     t.put(p);
145     g = new Get(R);
146     g.setMaxVersions(10);
147     r = t.get(g);
148     // still only two version visible
149     assertEquals(2, r.size());
150 
151     t.close();
152   }
153 
154   @Test
155   public void testTTL() throws Exception {
156     TableName tableName =
157         TableName.valueOf("testTTL");
158     if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
159       TEST_UTIL.deleteTable(tableName);
160     }
161     HTableDescriptor desc = new HTableDescriptor(tableName);
162     HColumnDescriptor hcd = new HColumnDescriptor(F)
163     .setMaxVersions(10)
164     .setTimeToLive(1);
165     desc.addFamily(hcd);
166     TEST_UTIL.getHBaseAdmin().createTable(desc);
167     HTable t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
168     long now = EnvironmentEdgeManager.currentTimeMillis();
169     ManualEnvironmentEdge me = new ManualEnvironmentEdge();
170     me.setValue(now);
171     EnvironmentEdgeManagerTestHelper.injectEdge(me);
172     // 2s in the past
173     long ts = now - 2000;
174     // Set the TTL override to 3s
175     Put p = new Put(R);
176     p.setAttribute("ttl", new byte[]{});
177     p.add(F, tableName.getName(), Bytes.toBytes(3000L));
178     t.put(p);
179 
180     p = new Put(R);
181     p.add(F, Q, ts, Q);
182     t.put(p);
183     p = new Put(R);
184     p.add(F, Q, ts+1, Q);
185     t.put(p);
186 
187     // these two should be expired but for the override
188     // (their ts was 2s in the past)
189     Get g = new Get(R);
190     g.setMaxVersions(10);
191     Result r = t.get(g);
192     // still there?
193     assertEquals(2, r.size());
194 
195     TEST_UTIL.flush(tableName);
196     TEST_UTIL.compact(tableName, true);
197 
198     g = new Get(R);
199     g.setMaxVersions(10);
200     r = t.get(g);
201     // still there?
202     assertEquals(2, r.size());
203 
204     // roll time forward 2s.
205     me.setValue(now + 2000);
206     // now verify that data eventually does expire
207     g = new Get(R);
208     g.setMaxVersions(10);
209     r = t.get(g);
210     // should be gone now
211     assertEquals(0, r.size());
212     t.close();
213   }
214 
215   public static class ScanObserver extends BaseRegionObserver {
216     private Map<TableName, Long> ttls =
217         new HashMap<TableName, Long>();
218     private Map<TableName, Integer> versions =
219         new HashMap<TableName, Integer>();
220 
221     // lame way to communicate with the coprocessor,
222     // since it is loaded by a different class loader
223     @Override
224     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
225         final WALEdit edit, final Durability durability) throws IOException {
226       if (put.getAttribute("ttl") != null) {
227         Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
228         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
229         ttls.put(TableName.valueOf(kv.getQualifier()), Bytes.toLong(kv.getValue()));
230         c.bypass();
231       } else if (put.getAttribute("versions") != null) {
232         Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
233         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
234         versions.put(TableName.valueOf(kv.getQualifier()), Bytes.toInt(kv.getValue()));
235         c.bypass();
236       }
237     }
238 
239     @Override
240     public InternalScanner preFlushScannerOpen(
241         final ObserverContext<RegionCoprocessorEnvironment> c,
242         Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
243       Long newTtl = ttls.get(store.getTableName());
244       if (newTtl != null) {
245         System.out.println("PreFlush:" + newTtl);
246       }
247       Integer newVersions = versions.get(store.getTableName());
248       ScanInfo oldSI = store.getScanInfo();
249       HColumnDescriptor family = store.getFamily();
250       ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
251           newVersions == null ? family.getMaxVersions() : newVersions,
252           newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
253           oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
254       Scan scan = new Scan();
255       scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
256       return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
257           ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
258           HConstants.OLDEST_TIMESTAMP);
259     }
260 
261     @Override
262     public InternalScanner preCompactScannerOpen(
263         final ObserverContext<RegionCoprocessorEnvironment> c,
264         Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
265         long earliestPutTs, InternalScanner s) throws IOException {
266       Long newTtl = ttls.get(store.getTableName());
267       Integer newVersions = versions.get(store.getTableName());
268       ScanInfo oldSI = store.getScanInfo();
269       HColumnDescriptor family = store.getFamily();
270       ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
271           newVersions == null ? family.getMaxVersions() : newVersions,
272           newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
273           oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
274       Scan scan = new Scan();
275       scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
276       return new StoreScanner(store, scanInfo, scan, scanners, scanType,
277           store.getSmallestReadPoint(), earliestPutTs);
278     }
279 
280     @Override
281     public KeyValueScanner preStoreScannerOpen(
282         final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan,
283         final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException {
284       Long newTtl = ttls.get(store.getTableName());
285       Integer newVersions = versions.get(store.getTableName());
286       ScanInfo oldSI = store.getScanInfo();
287       HColumnDescriptor family = store.getFamily();
288       ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
289           newVersions == null ? family.getMaxVersions() : newVersions,
290           newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
291           oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
292       return new StoreScanner(store, scanInfo, scan, targetCols,
293         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
294     }
295   }
296 
297 }