View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.coprocessor.example;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellUtil;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HTableDescriptor;
34  import org.apache.hadoop.hbase.testclassification.MediumTests;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.client.coprocessor.Batch;
41  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
42  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
43  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.Builder;
44  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
45  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
46  import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
47  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
48  import org.apache.hadoop.hbase.filter.FilterList;
49  import org.apache.hadoop.hbase.filter.FilterList.Operator;
50  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
51  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
52  import org.apache.hadoop.hbase.ipc.ServerRpcController;
53  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.junit.experimental.categories.Category;
56  
57  @Category(MediumTests.class)
58  public class TestBulkDeleteProtocol {
59    private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
60    private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
61    private static final byte[] QUALIFIER1 = Bytes.toBytes("c1");
62    private static final byte[] QUALIFIER2 = Bytes.toBytes("c2");
63    private static final byte[] QUALIFIER3 = Bytes.toBytes("c3");
64    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
65  
66    // @Ignore @BeforeClass
67    public static void setupBeforeClass() throws Exception {
68      TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
69          BulkDeleteEndpoint.class.getName());
70      TEST_UTIL.startMiniCluster(2);
71    }
72  
73    // @Ignore @AfterClass
74    public static void tearDownAfterClass() throws Exception {
75      TEST_UTIL.shutdownMiniCluster();
76    }
77  
78    // @Ignore @Test
79    public void testBulkDeleteEndpoint() throws Throwable {
80      byte[] tableName = Bytes.toBytes("testBulkDeleteEndpoint");
81      HTable ht = createTable(tableName);
82      List<Put> puts = new ArrayList<Put>(100);
83      for (int j = 0; j < 100; j++) {
84        byte[] rowkey = Bytes.toBytes(j);
85        puts.add(createPut(rowkey, "v1"));
86      }
87      ht.put(puts);
88      // Deleting all the rows.
89      long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 5, DeleteType.ROW, null);
90      assertEquals(100, noOfRowsDeleted);
91  
92      int rows = 0;
93      for (Result result : ht.getScanner(new Scan())) {
94        rows++;
95      }
96      assertEquals(0, rows);
97      ht.close();
98    }
99  
100   // @Ignore @Test
101   public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion()
102       throws Throwable {
103     byte[] tableName = Bytes
104         .toBytes("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion");
105     HTable ht = createTable(tableName);
106     List<Put> puts = new ArrayList<Put>(100);
107     for (int j = 0; j < 100; j++) {
108       byte[] rowkey = Bytes.toBytes(j);
109       puts.add(createPut(rowkey, "v1"));
110     }
111     ht.put(puts);
112     // Deleting all the rows.
113     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null);
114     assertEquals(100, noOfRowsDeleted);
115 
116     int rows = 0;
117     for (Result result : ht.getScanner(new Scan())) {
118       rows++;
119     }
120     assertEquals(0, rows);
121     ht.close();
122   }
123 
124   private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize,
125       final DeleteType deleteType, final Long timeStamp) throws Throwable {
126     HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
127     long noOfDeletedRows = 0L;
128     Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
129       new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
130       ServerRpcController controller = new ServerRpcController();
131       BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
132         new BlockingRpcCallback<BulkDeleteResponse>();
133 
134       public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
135         Builder builder = BulkDeleteRequest.newBuilder();
136         builder.setScan(ProtobufUtil.toScan(scan));
137         builder.setDeleteType(deleteType);
138         builder.setRowBatchSize(rowBatchSize);
139         if (timeStamp != null) {
140           builder.setTimestamp(timeStamp);
141         }
142         service.delete(controller, builder.build(), rpcCallback);
143         return rpcCallback.get();
144       }
145     };
146     Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
147         .getStartRow(), scan.getStopRow(), callable);
148     for (BulkDeleteResponse response : result.values()) {
149       noOfDeletedRows += response.getRowsDeleted();
150     }
151     ht.close();
152     return noOfDeletedRows;
153   }
154 
155   // @Ignore @Test
156   public void testBulkDeleteWithConditionBasedDelete() throws Throwable {
157     byte[] tableName = Bytes.toBytes("testBulkDeleteWithConditionBasedDelete");
158     HTable ht = createTable(tableName);
159     List<Put> puts = new ArrayList<Put>(100);
160     for (int j = 0; j < 100; j++) {
161       byte[] rowkey = Bytes.toBytes(j);
162       String value = (j % 10 == 0) ? "v1" : "v2";
163       puts.add(createPut(rowkey, value));
164     }
165     ht.put(puts);
166     Scan scan = new Scan();
167     FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
168     SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3,
169         CompareOp.EQUAL, Bytes.toBytes("v1"));
170     // fl.addFilter(new FirstKeyOnlyFilter());
171     fl.addFilter(scvf);
172     scan.setFilter(fl);
173     // Deleting all the rows where cf1:c1=v1
174     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null);
175     assertEquals(10, noOfRowsDeleted);
176 
177     int rows = 0;
178     for (Result result : ht.getScanner(new Scan())) {
179       rows++;
180     }
181     assertEquals(90, rows);
182     ht.close();
183   }
184 
185   // @Ignore @Test
186   public void testBulkDeleteColumn() throws Throwable {
187     byte[] tableName = Bytes.toBytes("testBulkDeleteColumn");
188     HTable ht = createTable(tableName);
189     List<Put> puts = new ArrayList<Put>(100);
190     for (int j = 0; j < 100; j++) {
191       byte[] rowkey = Bytes.toBytes(j);
192       String value = (j % 10 == 0) ? "v1" : "v2";
193       puts.add(createPut(rowkey, value));
194     }
195     ht.put(puts);
196     Scan scan = new Scan();
197     scan.addColumn(FAMILY1, QUALIFIER2);
198     // Delete the column cf1:col2
199     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null);
200     assertEquals(100, noOfRowsDeleted);
201 
202     int rows = 0;
203     for (Result result : ht.getScanner(new Scan())) {
204       assertEquals(2, result.getFamilyMap(FAMILY1).size());
205       assertTrue(result.getColumnCells(FAMILY1, QUALIFIER2).isEmpty());
206       assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER1).size());
207       assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER3).size());
208       rows++;
209     }
210     assertEquals(100, rows);
211     ht.close();
212   }
213 
214   // @Ignore @Test
215   public void testBulkDeleteFamily() throws Throwable {
216     byte[] tableName = Bytes.toBytes("testBulkDeleteFamily");
217     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
218     htd.addFamily(new HColumnDescriptor(FAMILY1));
219     htd.addFamily(new HColumnDescriptor(FAMILY2));
220     TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
221     HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
222     List<Put> puts = new ArrayList<Put>(100);
223     for (int j = 0; j < 100; j++) {
224       Put put = new Put(Bytes.toBytes(j));
225       put.add(FAMILY1, QUALIFIER1, "v1".getBytes());
226       put.add(FAMILY2, QUALIFIER2, "v2".getBytes());
227       puts.add(put);
228     }
229     ht.put(puts);
230     Scan scan = new Scan();
231     scan.addFamily(FAMILY1);
232     // Delete the column family cf1
233     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null);
234     assertEquals(100, noOfRowsDeleted);
235     int rows = 0;
236     for (Result result : ht.getScanner(new Scan())) {
237       assertTrue(result.getFamilyMap(FAMILY1).isEmpty());
238       assertEquals(1, result.getColumnCells(FAMILY2, QUALIFIER2).size());
239       rows++;
240     }
241     assertEquals(100, rows);
242     ht.close();
243   }
244 
245   // @Ignore @Test
246   public void testBulkDeleteColumnVersion() throws Throwable {
247     byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersion");
248     HTable ht = createTable(tableName);
249     List<Put> puts = new ArrayList<Put>(100);
250     for (int j = 0; j < 100; j++) {
251       Put put = new Put(Bytes.toBytes(j));
252       byte[] value = "v1".getBytes();
253       put.add(FAMILY1, QUALIFIER1, 1234L, value);
254       put.add(FAMILY1, QUALIFIER2, 1234L, value);
255       put.add(FAMILY1, QUALIFIER3, 1234L, value);
256       // Latest version values
257       value = "v2".getBytes();
258       put.add(FAMILY1, QUALIFIER1, value);
259       put.add(FAMILY1, QUALIFIER2, value);
260       put.add(FAMILY1, QUALIFIER3, value);
261       put.add(FAMILY1, null, value);
262       puts.add(put);
263     }
264     ht.put(puts);
265     Scan scan = new Scan();
266     scan.addFamily(FAMILY1);
267     // Delete the latest version values of all the columns in family cf1.
268     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION,
269         HConstants.LATEST_TIMESTAMP);
270     assertEquals(100, noOfRowsDeleted);
271     int rows = 0;
272     scan = new Scan();
273     scan.setMaxVersions();
274     for (Result result : ht.getScanner(scan)) {
275       assertEquals(3, result.getFamilyMap(FAMILY1).size());
276       List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER1);
277       assertEquals(1, column.size());
278       assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
279 
280       column = result.getColumnCells(FAMILY1, QUALIFIER2);
281       assertEquals(1, column.size());
282       assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
283 
284       column = result.getColumnCells(FAMILY1, QUALIFIER3);
285       assertEquals(1, column.size());
286       assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
287       rows++;
288     }
289     assertEquals(100, rows);
290     ht.close();
291   }
292 
293   // @Ignore @Test
294   public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable {
295     byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersionBasedOnTS");
296     HTable ht = createTable(tableName);
297     List<Put> puts = new ArrayList<Put>(100);
298     for (int j = 0; j < 100; j++) {
299       Put put = new Put(Bytes.toBytes(j));
300       // TS = 1000L
301       byte[] value = "v1".getBytes();
302       put.add(FAMILY1, QUALIFIER1, 1000L, value);
303       put.add(FAMILY1, QUALIFIER2, 1000L, value);
304       put.add(FAMILY1, QUALIFIER3, 1000L, value);
305       // TS = 1234L
306       value = "v2".getBytes();
307       put.add(FAMILY1, QUALIFIER1, 1234L, value);
308       put.add(FAMILY1, QUALIFIER2, 1234L, value);
309       put.add(FAMILY1, QUALIFIER3, 1234L, value);
310       // Latest version values
311       value = "v3".getBytes();
312       put.add(FAMILY1, QUALIFIER1, value);
313       put.add(FAMILY1, QUALIFIER2, value);
314       put.add(FAMILY1, QUALIFIER3, value);
315       puts.add(put);
316     }
317     ht.put(puts);
318     Scan scan = new Scan();
319     scan.addColumn(FAMILY1, QUALIFIER3);
320     // Delete the column cf1:c3's one version at TS=1234
321     long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L);
322     assertEquals(100, noOfRowsDeleted);
323     int rows = 0;
324     scan = new Scan();
325     scan.setMaxVersions();
326     for (Result result : ht.getScanner(scan)) {
327       assertEquals(3, result.getFamilyMap(FAMILY1).size());
328       assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER1).size());
329       assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER2).size());
330       List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER3);
331       assertEquals(2, column.size());
332       assertTrue(CellUtil.matchingValue(column.get(0), "v3".getBytes()));
333       assertTrue(CellUtil.matchingValue(column.get(1), "v1".getBytes()));
334       rows++;
335     }
336     assertEquals(100, rows);
337     ht.close();
338   }
339 
340   // @Ignore @Test
341   public void testBulkDeleteWithNumberOfVersions() throws Throwable {
342     byte[] tableName = Bytes.toBytes("testBulkDeleteWithNumberOfVersions");
343     HTable ht = createTable(tableName);
344     List<Put> puts = new ArrayList<Put>(100);
345     for (int j = 0; j < 100; j++) {
346       Put put = new Put(Bytes.toBytes(j));
347       // TS = 1000L
348       byte[] value = "v1".getBytes();
349       put.add(FAMILY1, QUALIFIER1, 1000L, value);
350       put.add(FAMILY1, QUALIFIER2, 1000L, value);
351       put.add(FAMILY1, QUALIFIER3, 1000L, value);
352       // TS = 1234L
353       value = "v2".getBytes();
354       put.add(FAMILY1, QUALIFIER1, 1234L, value);
355       put.add(FAMILY1, QUALIFIER2, 1234L, value);
356       put.add(FAMILY1, QUALIFIER3, 1234L, value);
357       // TS = 2000L
358       value = "v3".getBytes();
359       put.add(FAMILY1, QUALIFIER1, 2000L, value);
360       put.add(FAMILY1, QUALIFIER2, 2000L, value);
361       put.add(FAMILY1, QUALIFIER3, 2000L, value);
362       // Latest version values
363       value = "v4".getBytes();
364       put.add(FAMILY1, QUALIFIER1, value);
365       put.add(FAMILY1, QUALIFIER2, value);
366       put.add(FAMILY1, QUALIFIER3, value);
367       puts.add(put);
368     }
369     ht.put(puts);
370 
371     // Delete all the versions of columns cf1:c1 and cf1:c2 falling with the time range
372     // [1000,2000)
373     final Scan scan = new Scan();
374     scan.addColumn(FAMILY1, QUALIFIER1);
375     scan.addColumn(FAMILY1, QUALIFIER2);
376     scan.setTimeRange(1000L, 2000L);
377     scan.setMaxVersions();
378 
379     long noOfDeletedRows = 0L;
380     long noOfVersionsDeleted = 0L;
381     Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
382       new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
383       ServerRpcController controller = new ServerRpcController();
384       BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
385         new BlockingRpcCallback<BulkDeleteResponse>();
386 
387       public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
388         Builder builder = BulkDeleteRequest.newBuilder();
389         builder.setScan(ProtobufUtil.toScan(scan));
390         builder.setDeleteType(DeleteType.VERSION);
391         builder.setRowBatchSize(500);
392         service.delete(controller, builder.build(), rpcCallback);
393         return rpcCallback.get();
394       }
395     };
396     Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
397         .getStartRow(), scan.getStopRow(), callable);
398     for (BulkDeleteResponse response : result.values()) {
399       noOfDeletedRows += response.getRowsDeleted();
400       noOfVersionsDeleted += response.getVersionsDeleted();
401     }
402     assertEquals(100, noOfDeletedRows);
403     assertEquals(400, noOfVersionsDeleted);
404 
405     int rows = 0;
406     Scan scan1 = new Scan();
407     scan1.setMaxVersions();
408     for (Result res : ht.getScanner(scan1)) {
409       assertEquals(3, res.getFamilyMap(FAMILY1).size());
410       List<Cell> column = res.getColumnCells(FAMILY1, QUALIFIER1);
411       assertEquals(2, column.size());
412       assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes()));
413       assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes()));
414       column = res.getColumnCells(FAMILY1, QUALIFIER2);
415       assertEquals(2, column.size());
416       assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes()));
417       assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes()));
418       assertEquals(4, res.getColumnCells(FAMILY1, QUALIFIER3).size());
419       rows++;
420     }
421     assertEquals(100, rows);
422     ht.close();
423   }
424 
425   private HTable createTable(byte[] tableName) throws IOException {
426     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
427     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1);
428     hcd.setMaxVersions(10);// Just setting 10 as I am not testing with more than 10 versions here
429     htd.addFamily(hcd);
430     TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
431     HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
432     return ht;
433   }
434 
435   private Put createPut(byte[] rowkey, String value) throws IOException {
436     Put put = new Put(rowkey);
437     put.add(FAMILY1, QUALIFIER1, value.getBytes());
438     put.add(FAMILY1, QUALIFIER2, value.getBytes());
439     put.add(FAMILY1, QUALIFIER3, value.getBytes());
440     return put;
441   }
442 }