View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.junit.Assert.*;
22  
23  import java.io.IOException;
24  import java.util.Arrays;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.*;
31  import org.apache.hadoop.hbase.testclassification.LargeTests;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.junit.After;
34  import org.junit.AfterClass;
35  import org.junit.Before;
36  import org.junit.BeforeClass;
37  import org.junit.Test;
38  import org.junit.experimental.categories.Category;
39  
40  /**
41   * Run tests related to {@link TimestampsFilter} using HBase client APIs.
42   * Sets up the HBase mini cluster once at start. Each creates a table
43   * named for the method and does its stuff against that.
44   */
45  @Category(LargeTests.class)
46  public class TestMultipleTimestamps {
47    final Log LOG = LogFactory.getLog(getClass());
48    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
49  
50    /**
51     * @throws java.lang.Exception
52     */
53    @BeforeClass
54    public static void setUpBeforeClass() throws Exception {
55      TEST_UTIL.startMiniCluster();
56    }
57  
58    /**
59     * @throws java.lang.Exception
60     */
61    @AfterClass
62    public static void tearDownAfterClass() throws Exception {
63      TEST_UTIL.shutdownMiniCluster();
64    }
65  
66    /**
67     * @throws java.lang.Exception
68     */
69    @Before
70    public void setUp() throws Exception {
71      // Nothing to do.
72    }
73  
74    /**
75     * @throws java.lang.Exception
76     */
77    @After
78    public void tearDown() throws Exception {
79      // Nothing to do.
80    }
81  
82    @Test
83    public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
84      TableName TABLE =
85          TableName.valueOf("testReseeksWithOne" +
86              "ColumnMiltipleTimestamps");
87      byte [] FAMILY = Bytes.toBytes("event_log");
88      byte [][] FAMILIES = new byte[][] { FAMILY };
89  
90      // create table; set versions to max...
91      HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
92  
93      Integer[] putRows = new Integer[] {1, 3, 5, 7};
94      Integer[] putColumns = new Integer[] { 1, 3, 5};
95      Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
96  
97      Integer[] scanRows = new Integer[] {3, 5};
98      Integer[] scanColumns = new Integer[] {3};
99      Long[] scanTimestamps = new Long[] {3L, 4L};
100     int scanMaxVersions = 2;
101 
102     put(ht, FAMILY, putRows, putColumns, putTimestamps);
103 
104     TEST_UTIL.flush(TABLE);
105 
106     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
107         scanTimestamps, scanMaxVersions);
108 
109     Cell [] kvs;
110 
111     kvs = scanner.next().rawCells();
112     assertEquals(2, kvs.length);
113     checkOneCell(kvs[0], FAMILY, 3, 3, 4);
114     checkOneCell(kvs[1], FAMILY, 3, 3, 3);
115     kvs = scanner.next().rawCells();
116     assertEquals(2, kvs.length);
117     checkOneCell(kvs[0], FAMILY, 5, 3, 4);
118     checkOneCell(kvs[1], FAMILY, 5, 3, 3);
119 
120     ht.close();
121   }
122 
123   @Test
124   public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
125     LOG.info("testReseeksWithMultipleColumnOneTimestamp");
126     TableName TABLE =
127         TableName.valueOf("testReseeksWithMultiple" +
128             "ColumnOneTimestamps");
129     byte [] FAMILY = Bytes.toBytes("event_log");
130     byte [][] FAMILIES = new byte[][] { FAMILY };
131 
132     // create table; set versions to max...
133     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
134 
135     Integer[] putRows = new Integer[] {1, 3, 5, 7};
136     Integer[] putColumns = new Integer[] { 1, 3, 5};
137     Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
138 
139     Integer[] scanRows = new Integer[] {3, 5};
140     Integer[] scanColumns = new Integer[] {3,4};
141     Long[] scanTimestamps = new Long[] {3L};
142     int scanMaxVersions = 2;
143 
144     put(ht, FAMILY, putRows, putColumns, putTimestamps);
145 
146     TEST_UTIL.flush(TABLE);
147 
148     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
149         scanTimestamps, scanMaxVersions);
150 
151     Cell[] kvs;
152 
153     kvs = scanner.next().rawCells();
154     assertEquals(1, kvs.length);
155     checkOneCell(kvs[0], FAMILY, 3, 3, 3);
156     kvs = scanner.next().rawCells();
157     assertEquals(1, kvs.length);
158     checkOneCell(kvs[0], FAMILY, 5, 3, 3);
159 
160     ht.close();
161   }
162 
163   @Test
164   public void testReseeksWithMultipleColumnMultipleTimestamp() throws
165   IOException {
166     LOG.info("testReseeksWithMultipleColumnMultipleTimestamp");
167 
168     TableName TABLE =
169         TableName.valueOf("testReseeksWithMultipleColumnMiltipleTimestamps");
170     byte [] FAMILY = Bytes.toBytes("event_log");
171     byte [][] FAMILIES = new byte[][] { FAMILY };
172 
173     // create table; set versions to max...
174     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
175 
176     Integer[] putRows = new Integer[] {1, 3, 5, 7};
177     Integer[] putColumns = new Integer[] { 1, 3, 5};
178     Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
179 
180     Integer[] scanRows = new Integer[] {5, 7};
181     Integer[] scanColumns = new Integer[] {3, 4, 5};
182     Long[] scanTimestamps = new Long[] {2l, 3L};
183     int scanMaxVersions = 2;
184 
185     put(ht, FAMILY, putRows, putColumns, putTimestamps);
186 
187     TEST_UTIL.flush(TABLE);
188     Scan scan = new Scan();
189     scan.setMaxVersions(10);
190     ResultScanner scanner = ht.getScanner(scan);
191     while (true) {
192       Result r = scanner.next();
193       if (r == null) break;
194       LOG.info("r=" + r);
195     }
196     scanner = scan(ht, FAMILY, scanRows, scanColumns, scanTimestamps, scanMaxVersions);
197 
198     Cell[] kvs;
199 
200     // This looks like wrong answer.  Should be 2.  Even then we are returning wrong result,
201     // timestamps that are 3 whereas should be 2 since min is inclusive.
202     kvs = scanner.next().rawCells();
203     assertEquals(4, kvs.length);
204     checkOneCell(kvs[0], FAMILY, 5, 3, 3);
205     checkOneCell(kvs[1], FAMILY, 5, 3, 2);
206     checkOneCell(kvs[2], FAMILY, 5, 5, 3);
207     checkOneCell(kvs[3], FAMILY, 5, 5, 2);
208     kvs = scanner.next().rawCells();
209     assertEquals(4, kvs.length);
210     checkOneCell(kvs[0], FAMILY, 7, 3, 3);
211     checkOneCell(kvs[1], FAMILY, 7, 3, 2);
212     checkOneCell(kvs[2], FAMILY, 7, 5, 3);
213     checkOneCell(kvs[3], FAMILY, 7, 5, 2);
214 
215     ht.close();
216   }
217 
218   @Test
219   public void testReseeksWithMultipleFiles() throws IOException {
220     LOG.info("testReseeksWithMultipleFiles");
221     TableName TABLE =
222         TableName.valueOf("testReseeksWithMultipleFiles");
223     byte [] FAMILY = Bytes.toBytes("event_log");
224     byte [][] FAMILIES = new byte[][] { FAMILY };
225 
226     // create table; set versions to max...
227     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
228 
229     Integer[] putRows1 = new Integer[] {1, 2, 3};
230     Integer[] putColumns1 = new Integer[] { 2, 5, 6};
231     Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
232 
233     Integer[] putRows2 = new Integer[] {6, 7};
234     Integer[] putColumns2 = new Integer[] {3, 6};
235     Long[] putTimestamps2 = new Long[] {4L, 5L};
236 
237     Integer[] putRows3 = new Integer[] {2, 3, 5};
238     Integer[] putColumns3 = new Integer[] {1, 2, 3};
239     Long[] putTimestamps3 = new Long[] {4L,8L};
240 
241 
242     Integer[] scanRows = new Integer[] {3, 5, 7};
243     Integer[] scanColumns = new Integer[] {3, 4, 5};
244     Long[] scanTimestamps = new Long[] {2l, 4L};
245     int scanMaxVersions = 5;
246 
247     put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
248     TEST_UTIL.flush(TABLE);
249     put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
250     TEST_UTIL.flush(TABLE);
251     put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
252 
253     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
254         scanTimestamps, scanMaxVersions);
255 
256     Cell[] kvs;
257 
258     kvs = scanner.next().rawCells();
259     assertEquals(2, kvs.length);
260     checkOneCell(kvs[0], FAMILY, 3, 3, 4);
261     checkOneCell(kvs[1], FAMILY, 3, 5, 2);
262 
263     kvs = scanner.next().rawCells();
264     assertEquals(1, kvs.length);
265     checkOneCell(kvs[0], FAMILY, 5, 3, 4);
266 
267     kvs = scanner.next().rawCells();
268     assertEquals(1, kvs.length);
269     checkOneCell(kvs[0], FAMILY, 6, 3, 4);
270 
271     kvs = scanner.next().rawCells();
272     assertEquals(1, kvs.length);
273     checkOneCell(kvs[0], FAMILY, 7, 3, 4);
274 
275     ht.close();
276   }
277 
278   @Test
279   public void testWithVersionDeletes() throws Exception {
280 
281     // first test from memstore (without flushing).
282     testWithVersionDeletes(false);
283 
284     // run same test against HFiles (by forcing a flush).
285     testWithVersionDeletes(true);
286   }
287 
288   public void testWithVersionDeletes(boolean flushTables) throws IOException {
289     LOG.info("testWithVersionDeletes_"+ (flushTables ? "flush" : "noflush"));
290     TableName TABLE =
291         TableName.valueOf("testWithVersionDeletes_" + (flushTables ?
292             "flush" : "noflush"));
293     byte [] FAMILY = Bytes.toBytes("event_log");
294     byte [][] FAMILIES = new byte[][] { FAMILY };
295 
296     // create table; set versions to max...
297     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
298 
299     // For row:0, col:0: insert versions 1 through 5.
300     putNVersions(ht, FAMILY, 0, 0, 1, 5);
301 
302     if (flushTables) {
303       TEST_UTIL.flush(TABLE);
304     }
305 
306     // delete version 4.
307     deleteOneVersion(ht, FAMILY, 0, 0, 4);
308 
309     // request a bunch of versions including the deleted version. We should
310     // only get back entries for the versions that exist.
311     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0,
312         Arrays.asList(2L, 3L, 4L, 5L));
313     assertEquals(3, kvs.length);
314     checkOneCell(kvs[0], FAMILY, 0, 0, 5);
315     checkOneCell(kvs[1], FAMILY, 0, 0, 3);
316     checkOneCell(kvs[2], FAMILY, 0, 0, 2);
317 
318     ht.close();
319   }
320 
321   @Test
322   public void testWithMultipleVersionDeletes() throws IOException {
323     LOG.info("testWithMultipleVersionDeletes");
324 
325     TableName TABLE =
326         TableName.valueOf("testWithMultipleVersionDeletes");
327     byte [] FAMILY = Bytes.toBytes("event_log");
328     byte [][] FAMILIES = new byte[][] { FAMILY };
329 
330     // create table; set versions to max...
331     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
332 
333     // For row:0, col:0: insert versions 1 through 5.
334     putNVersions(ht, FAMILY, 0, 0, 1, 5);
335 
336     TEST_UTIL.flush(TABLE);
337 
338     // delete all versions before 4.
339     deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
340 
341     // request a bunch of versions including the deleted version. We should
342     // only get back entries for the versions that exist.
343     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
344     assertEquals(0, kvs.length);
345 
346     ht.close();
347   }
348 
349   @Test
350   public void testWithColumnDeletes() throws IOException {
351     TableName TABLE =
352         TableName.valueOf("testWithColumnDeletes");
353     byte [] FAMILY = Bytes.toBytes("event_log");
354     byte [][] FAMILIES = new byte[][] { FAMILY };
355 
356     // create table; set versions to max...
357     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
358 
359     // For row:0, col:0: insert versions 1 through 5.
360     putNVersions(ht, FAMILY, 0, 0, 1, 5);
361 
362     TEST_UTIL.flush(TABLE);
363 
364     // delete all versions before 4.
365     deleteColumn(ht, FAMILY, 0, 0);
366 
367     // request a bunch of versions including the deleted version. We should
368     // only get back entries for the versions that exist.
369     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
370     assertEquals(0, kvs.length);
371 
372     ht.close();
373   }
374 
375   @Test
376   public void testWithFamilyDeletes() throws IOException {
377     TableName TABLE =
378         TableName.valueOf("testWithFamilyDeletes");
379     byte [] FAMILY = Bytes.toBytes("event_log");
380     byte [][] FAMILIES = new byte[][] { FAMILY };
381 
382     // create table; set versions to max...
383     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
384 
385     // For row:0, col:0: insert versions 1 through 5.
386     putNVersions(ht, FAMILY, 0, 0, 1, 5);
387 
388     TEST_UTIL.flush(TABLE);
389 
390     // delete all versions before 4.
391     deleteFamily(ht, FAMILY, 0);
392 
393     // request a bunch of versions including the deleted version. We should
394     // only get back entries for the versions that exist.
395     Cell kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
396     assertEquals(0, kvs.length);
397 
398     ht.close();
399   }
400 
401   /**
402    * Assert that the passed in KeyValue has expected contents for the
403    * specified row, column & timestamp.
404    */
405   private void checkOneCell(Cell kv, byte[] cf,
406       int rowIdx, int colIdx, long ts) {
407 
408     String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
409 
410     assertEquals("Row mismatch which checking: " + ctx,
411         "row:"+ rowIdx, Bytes.toString(CellUtil.cloneRow(kv)));
412 
413     assertEquals("ColumnFamily mismatch while checking: " + ctx,
414         Bytes.toString(cf), Bytes.toString(CellUtil.cloneFamily(kv)));
415 
416     assertEquals("Column qualifier mismatch while checking: " + ctx,
417         "column:" + colIdx,
418         Bytes.toString(CellUtil.cloneQualifier(kv)));
419 
420     assertEquals("Timestamp mismatch while checking: " + ctx,
421         ts, kv.getTimestamp());
422 
423     assertEquals("Value mismatch while checking: " + ctx,
424         "value-version-" + ts, Bytes.toString(CellUtil.cloneValue(kv)));
425   }
426 
427   /**
428    * Uses the TimestampFilter on a Get to request a specified list of
429    * versions for the row/column specified by rowIdx & colIdx.
430    *
431    */
432   private  Cell[] getNVersions(HTable ht, byte[] cf, int rowIdx,
433       int colIdx, List<Long> versions)
434   throws IOException {
435     byte row[] = Bytes.toBytes("row:" + rowIdx);
436     byte column[] = Bytes.toBytes("column:" + colIdx);
437     Get get = new Get(row);
438     get.addColumn(cf, column);
439     get.setMaxVersions();
440     get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
441     Result result = ht.get(get);
442 
443     return result.rawCells();
444   }
445 
446   private  ResultScanner scan(HTable ht, byte[] cf,
447       Integer[] rowIndexes, Integer[] columnIndexes,
448       Long[] versions, int maxVersions)
449   throws IOException {
450     Arrays.asList(rowIndexes);
451     byte startRow[] = Bytes.toBytes("row:" +
452         Collections.min( Arrays.asList(rowIndexes)));
453     byte endRow[] = Bytes.toBytes("row:" +
454         Collections.max( Arrays.asList(rowIndexes))+1);
455     Scan scan = new Scan(startRow, endRow);
456     for (Integer colIdx: columnIndexes) {
457       byte column[] = Bytes.toBytes("column:" + colIdx);
458       scan.addColumn(cf, column);
459     }
460     scan.setMaxVersions(maxVersions);
461     scan.setTimeRange(Collections.min(Arrays.asList(versions)),
462         Collections.max(Arrays.asList(versions))+1);
463     ResultScanner scanner = ht.getScanner(scan);
464     return scanner;
465   }
466 
467   private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
468       Integer[] columnIndexes, Long[] versions)
469   throws IOException {
470     for (int rowIdx: rowIndexes) {
471       byte row[] = Bytes.toBytes("row:" + rowIdx);
472       Put put = new Put(row);
473       put.setDurability(Durability.SKIP_WAL);
474       for(int colIdx: columnIndexes) {
475         byte column[] = Bytes.toBytes("column:" + colIdx);
476         for (long version: versions) {
477           put.add(cf, column, version, Bytes.toBytes("value-version-" +
478               version));
479         }
480       }
481       ht.put(put);
482     }
483   }
484 
485   /**
486    * Insert in specific row/column versions with timestamps
487    * versionStart..versionEnd.
488    */
489   private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
490       long versionStart, long versionEnd)
491   throws IOException {
492     byte row[] = Bytes.toBytes("row:" + rowIdx);
493     byte column[] = Bytes.toBytes("column:" + colIdx);
494     Put put = new Put(row);
495     put.setDurability(Durability.SKIP_WAL);
496 
497     for (long idx = versionStart; idx <= versionEnd; idx++) {
498       put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
499     }
500 
501     ht.put(put);
502   }
503 
504   /**
505    * For row/column specified by rowIdx/colIdx, delete the cell
506    * corresponding to the specified version.
507    */
508   private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
509       int colIdx, long version)
510   throws IOException {
511     byte row[] = Bytes.toBytes("row:" + rowIdx);
512     byte column[] = Bytes.toBytes("column:" + colIdx);
513     Delete del = new Delete(row);
514     del.deleteColumn(cf, column, version);
515     ht.delete(del);
516   }
517 
518   /**
519    * For row/column specified by rowIdx/colIdx, delete all cells
520    * preceeding the specified version.
521    */
522   private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx,
523       int colIdx, long version)
524   throws IOException {
525     byte row[] = Bytes.toBytes("row:" + rowIdx);
526     byte column[] = Bytes.toBytes("column:" + colIdx);
527     Delete del = new Delete(row);
528     del.deleteColumns(cf, column, version);
529     ht.delete(del);
530   }
531 
532   private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
533     byte row[] = Bytes.toBytes("row:" + rowIdx);
534     byte column[] = Bytes.toBytes("column:" + colIdx);
535     Delete del = new Delete(row);
536     del.deleteColumns(cf, column);
537     ht.delete(del);
538   }
539 
540   private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException {
541     byte row[] = Bytes.toBytes("row:" + rowIdx);
542     Delete del = new Delete(row);
543     del.deleteFamily(cf);
544     ht.delete(del);
545   }
546 
547 }
548 
549