View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  import java.util.TreeMap;
28  
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FSDataOutputStream;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HColumnDescriptor;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.testclassification.LargeTests;
38  import org.apache.hadoop.hbase.TableNotFoundException;
39  import org.apache.hadoop.hbase.NamespaceDescriptor;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.client.HTable;
42  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
43  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
44  import org.apache.hadoop.hbase.io.hfile.HFile;
45  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
46  import org.apache.hadoop.hbase.regionserver.BloomType;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.HFileTestUtil;
49  import org.junit.AfterClass;
50  import org.junit.BeforeClass;
51  import org.junit.Test;
52  import org.junit.experimental.categories.Category;
53  import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
54  
55  /**
56   * Test cases for the "load" half of the HFileOutputFormat bulk load
57   * functionality. These tests run faster than the full MR cluster
58   * tests in TestHFileOutputFormat
59   */
60  @Category(LargeTests.class)
61  public class TestLoadIncrementalHFiles {
62    private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
63    private static final byte[] FAMILY = Bytes.toBytes("myfam");
64    private static final String NAMESPACE = "bulkNS";
65  
66    static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
67    static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
68  
69    private static final byte[][] SPLIT_KEYS = new byte[][] {
70      Bytes.toBytes("ddd"),
71      Bytes.toBytes("ppp")
72    };
73  
74    static HBaseTestingUtility util = new HBaseTestingUtility();
75  
76    @BeforeClass
77    public static void setUpBeforeClass() throws Exception {
78      util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
79      util.getConfiguration().setInt(
80        LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
81        MAX_FILES_PER_REGION_PER_FAMILY);
82      util.startMiniCluster();
83  
84      setupNamespace();
85    }
86  
87    protected static void setupNamespace() throws Exception {
88      util.getHBaseAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
89    }
90  
91    @AfterClass
92    public static void tearDownAfterClass() throws Exception {
93      util.shutdownMiniCluster();
94    }
95  
96    /**
97     * Test case that creates some regions and loads
98     * HFiles that fit snugly inside those regions
99     */
100   @Test
101   public void testSimpleLoad() throws Exception {
102     runTest("testSimpleLoad", BloomType.NONE,
103         new byte[][][] {
104           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
105           new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
106     });
107   }
108 
109   /**
110    * Test case that creates some regions and loads
111    * HFiles that cross the boundaries of those regions
112    */
113   @Test
114   public void testRegionCrossingLoad() throws Exception {
115     runTest("testRegionCrossingLoad", BloomType.NONE,
116         new byte[][][] {
117           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
118           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
119     });
120   }
121 
122   /**
123    * Test loading into a column family that has a ROW bloom filter.
124    */
125   @Test
126   public void testRegionCrossingRowBloom() throws Exception {
127     runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
128         new byte[][][] {
129           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
130           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
131     });
132   }
133 
134   /**
135    * Test loading into a column family that has a ROWCOL bloom filter.
136    */
137   @Test
138   public void testRegionCrossingRowColBloom() throws Exception {
139     runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
140         new byte[][][] {
141           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
142           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
143     });
144   }
145 
146   /**
147    * Test case that creates some regions and loads HFiles that have
148    * different region boundaries than the table pre-split.
149    */
150   @Test
151   public void testSimpleHFileSplit() throws Exception {
152     runTest("testHFileSplit", BloomType.NONE,
153         new byte[][] {
154           Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
155           Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
156         },
157         new byte[][][] {
158           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
159           new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") },
160         }
161     );
162   }
163 
164   /**
165    * Test case that creates some regions and loads HFiles that cross the boundaries
166    * and have different region boundaries than the table pre-split.
167    */
168   @Test
169   public void testRegionCrossingHFileSplit() throws Exception {
170     testRegionCrossingHFileSplit(BloomType.NONE);
171   }
172 
173   /**
174    * Test case that creates some regions and loads HFiles that cross the boundaries
175    * have a ROW bloom filter and a different region boundaries than the table pre-split.
176    */
177   @Test
178   public void testRegionCrossingHFileSplitRowBloom() throws Exception {
179     testRegionCrossingHFileSplit(BloomType.ROW);
180   }
181 
182   /**
183    * Test case that creates some regions and loads HFiles that cross the boundaries
184    * have a ROWCOL bloom filter and a different region boundaries than the table pre-split.
185    */
186   @Test
187   public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
188     testRegionCrossingHFileSplit(BloomType.ROWCOL);
189   }
190 
191   private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
192     runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
193         new byte[][] {
194           Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
195           Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
196         },
197         new byte[][][] {
198           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
199           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
200         }
201     );
202   }
203 
204   private void runTest(String testName, BloomType bloomType,
205       byte[][][] hfileRanges) throws Exception {
206     runTest(testName, bloomType, null, hfileRanges);
207   }
208 
209   private void runTest(String testName, BloomType bloomType,
210       byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
211     final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
212     final boolean preCreateTable = tableSplitKeys != null;
213 
214     // Run the test bulkloading the table to the default namespace
215     final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
216     runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
217 
218     // Run the test bulkloading the table to the specified namespace
219     final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
220     runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
221   }
222 
223   private void runTest(String testName, TableName tableName, BloomType bloomType,
224       boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
225     HTableDescriptor htd = new HTableDescriptor(tableName);
226     HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
227     familyDesc.setBloomFilterType(bloomType);
228     htd.addFamily(familyDesc);
229     runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
230   }
231 
232   private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
233       boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
234     Path dir = util.getDataTestDirOnTestFS(testName);
235     FileSystem fs = util.getTestFileSystem();
236     dir = dir.makeQualified(fs);
237     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
238 
239     int hfileIdx = 0;
240     for (byte[][] range : hfileRanges) {
241       byte[] from = range[0];
242       byte[] to = range[1];
243       HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
244           + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
245     }
246     int expectedRows = hfileIdx * 1000;
247 
248     if (preCreateTable) {
249       util.getHBaseAdmin().createTable(htd, tableSplitKeys);
250     }
251 
252     final TableName tableName = htd.getTableName();
253     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
254     String [] args= {dir.toString(), tableName.toString()};
255     loader.run(args);
256 
257     HTable table = new HTable(util.getConfiguration(), tableName);
258     try {
259       assertEquals(expectedRows, util.countRows(table));
260     } finally {
261       table.close();
262     }
263 
264     // verify staging folder has been cleaned up
265     Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
266     if(fs.exists(stagingBasePath)) {
267       FileStatus[] files = fs.listStatus(stagingBasePath);
268       for(FileStatus file : files) {
269         assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
270           file.getPath().getName() != "DONOTERASE");
271       }
272     }
273 
274     util.deleteTable(tableName);
275   }
276 
277   /**
278    * Test loading into a column family that does not exist.
279    */
280   @Test
281   public void testNonexistentColumnFamilyLoad() throws Exception {
282     String testName = "testNonexistentColumnFamilyLoad";
283     byte[][][] hFileRanges = new byte[][][] {
284       new byte[][]{ Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
285       new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
286     };
287 
288     final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
289     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
290     // set real family name to upper case in purpose to simulate the case that
291     // family name in HFiles is invalid
292     HColumnDescriptor family =
293         new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase()));
294     htd.addFamily(family);
295 
296     try {
297       runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges);
298       assertTrue("Loading into table with non-existent family should have failed", false);
299     } catch (Exception e) {
300       assertTrue("IOException expected", e instanceof IOException);
301       // further check whether the exception message is correct
302       String errMsg = e.getMessage();
303       assertTrue("Incorrect exception message, expected message: ["
304           + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]",
305           errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
306     }
307   }
308 
309   @Test(timeout = 60000)
310   public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
311     testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
312   }
313 
314   @Test(timeout = 60000)
315   public void testNonHfileFolder() throws Exception {
316     testNonHfileFolder("testNonHfileFolder", false);
317   }
318 
319   /**
320    * Write a random data file and a non-file in a dir with a valid family name
321    * but not part of the table families. we should we able to bulkload without
322    * getting the unmatched family exception. HBASE-13037/HBASE-13227
323    */
324   private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
325     Path dir = util.getDataTestDirOnTestFS(tableName);
326     FileSystem fs = util.getTestFileSystem();
327     dir = dir.makeQualified(fs);
328 
329     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
330     HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"),
331         FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
332     createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
333 
334     final String NON_FAMILY_FOLDER = "_logs";
335     Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
336     fs.mkdirs(nonFamilyDir);
337     fs.mkdirs(new Path(nonFamilyDir, "non-file"));
338     createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
339 
340     HTable table = null;
341     try {
342       if (preCreateTable) {
343         table = util.createTable(TableName.valueOf(tableName), FAMILY);
344       } else {
345         table = new HTable(util.getConfiguration(), TableName.valueOf(tableName));
346       }
347 
348       final String[] args = {dir.toString(), tableName};
349       new LoadIncrementalHFiles(util.getConfiguration()).run(args);
350       assertEquals(500, util.countRows(table));
351     } finally {
352       if (table != null) {
353         table.close();
354       }
355       fs.delete(dir, true);
356     }
357   }
358 
359   private static void createRandomDataFile(FileSystem fs, Path path, int size)
360       throws IOException {
361     FSDataOutputStream stream = fs.create(path);
362     try {
363       byte[] data = new byte[1024];
364       for (int i = 0; i < data.length; ++i) {
365         data[i] = (byte)(i & 0xff);
366       }
367       while (size >= data.length) {
368         stream.write(data, 0, data.length);
369         size -= data.length;
370       }
371       if (size > 0) {
372         stream.write(data, 0, size);
373       }
374     } finally {
375       stream.close();
376     }
377   }
378 
379   @Test
380   public void testSplitStoreFile() throws IOException {
381     Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
382     FileSystem fs = util.getTestFileSystem();
383     Path testIn = new Path(dir, "testhfile");
384     HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
385     HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
386         Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
387 
388     Path bottomOut = new Path(dir, "bottom.out");
389     Path topOut = new Path(dir, "top.out");
390 
391     LoadIncrementalHFiles.splitStoreFile(
392         util.getConfiguration(), testIn,
393         familyDesc, Bytes.toBytes("ggg"),
394         bottomOut,
395         topOut);
396 
397     int rowCount = verifyHFile(bottomOut);
398     rowCount += verifyHFile(topOut);
399     assertEquals(1000, rowCount);
400   }
401 
402   private int verifyHFile(Path p) throws IOException {
403     Configuration conf = util.getConfiguration();
404     HFile.Reader reader = HFile.createReader(
405         p.getFileSystem(conf), p, new CacheConfig(conf), conf);
406     reader.loadFileInfo();
407     HFileScanner scanner = reader.getScanner(false, false);
408     scanner.seekTo();
409     int count = 0;
410     do {
411       count++;
412     } while (scanner.next());
413     assertTrue(count > 0);
414     reader.close();
415     return count;
416   }
417 
418   private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
419     Integer value = map.containsKey(first)?map.get(first):0;
420     map.put(first, value+1);
421 
422     value = map.containsKey(last)?map.get(last):0;
423     map.put(last, value-1);
424   }
425 
426   @Test
427   public void testInferBoundaries() {
428     TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
429 
430     /* Toy example
431      *     c---------i            o------p          s---------t     v------x
432      * a------e    g-----k   m-------------q   r----s            u----w
433      *
434      * Should be inferred as:
435      * a-----------------k   m-------------q   r--------------t  u---------x
436      *
437      * The output should be (m,r,u)
438      */
439 
440     String first;
441     String last;
442 
443     first = "a"; last = "e";
444     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
445 
446     first = "r"; last = "s";
447     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
448 
449     first = "o"; last = "p";
450     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
451 
452     first = "g"; last = "k";
453     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
454 
455     first = "v"; last = "x";
456     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
457 
458     first = "c"; last = "i";
459     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
460 
461     first = "m"; last = "q";
462     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
463 
464     first = "s"; last = "t";
465     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
466 
467     first = "u"; last = "w";
468     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
469 
470     byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
471     byte[][] compare = new byte[3][];
472     compare[0] = "m".getBytes();
473     compare[1] = "r".getBytes();
474     compare[2] = "u".getBytes();
475 
476     assertEquals(keysArray.length, 3);
477 
478     for (int row = 0; row<keysArray.length; row++){
479       assertArrayEquals(keysArray[row], compare[row]);
480     }
481   }
482 
483   @Test
484   public void testLoadTooMayHFiles() throws Exception {
485     Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
486     FileSystem fs = util.getTestFileSystem();
487     dir = dir.makeQualified(fs);
488     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
489 
490     byte[] from = Bytes.toBytes("begin");
491     byte[] to = Bytes.toBytes("end");
492     for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
493       HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
494           + i), FAMILY, QUALIFIER, from, to, 1000);
495     }
496 
497     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
498     String [] args= {dir.toString(), "mytable_testLoadTooMayHFiles"};
499     try {
500       loader.run(args);
501       fail("Bulk loading too many files should fail");
502     } catch (IOException ie) {
503       assertTrue(ie.getMessage().contains("Trying to load more than "
504         + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
505     }
506   }
507 
508   @Test(expected = TableNotFoundException.class)
509   public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
510     Configuration conf = util.getConfiguration();
511     conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
512     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
513     String[] args = { "directory", "nonExistingTable" };
514     loader.run(args);
515   }
516 
517   @Test
518   public void testTableWithCFNameStartWithUnderScore() throws Exception {
519     Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
520     FileSystem fs = util.getTestFileSystem();
521     dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
522     String family = "_cf";
523     Path familyDir = new Path(dir, family);
524 
525     byte[] from = Bytes.toBytes("begin");
526     byte[] to = Bytes.toBytes("end");
527     Configuration conf = util.getConfiguration();
528     String tableName = "mytable_cfNameStartWithUnderScore";
529     HTable table = util.createTable(tableName, family);
530     HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
531       QUALIFIER, from, to, 1000);
532 
533     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
534     String[] args = { dir.toString(), tableName };
535     try {
536       loader.run(args);
537       assertEquals(1000, util.countRows(table));
538     } finally {
539       if (null != table) {
540         table.close();
541       }
542     }
543   }
544 }
545