View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  import static org.mockito.Matchers.any;
25  import static org.mockito.Mockito.doAnswer;
26  import static org.mockito.Mockito.mock;
27  import static org.mockito.Mockito.when;
28  
29  import java.io.ByteArrayOutputStream;
30  import java.io.IOException;
31  import java.io.PrintStream;
32  import java.net.URL;
33  import java.util.ArrayList;
34  import java.util.List;
35  
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.testclassification.MediumTests;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Delete;
49  import org.apache.hadoop.hbase.client.Durability;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.HTable;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.filter.Filter;
57  import org.apache.hadoop.hbase.filter.FilterBase;
58  import org.apache.hadoop.hbase.filter.PrefixFilter;
59  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
60  import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
61  import org.apache.hadoop.hbase.regionserver.wal.HLog;
62  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
63  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
64  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
67  import org.apache.hadoop.mapreduce.Job;
68  import org.apache.hadoop.mapreduce.Mapper.Context;
69  import org.apache.hadoop.util.GenericOptionsParser;
70  import org.junit.After;
71  import org.junit.AfterClass;
72  import org.junit.Assert;
73  import org.junit.Before;
74  import org.junit.BeforeClass;
75  import org.junit.Test;
76  import org.junit.experimental.categories.Category;
77  import org.mockito.invocation.InvocationOnMock;
78  import org.mockito.stubbing.Answer;
79  
80  /**
81   * Tests the table import and table export MR job functionality
82   */
83  @Category(MediumTests.class)
84  public class TestImportExport {
85    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
86    private static final byte[] ROW1 = Bytes.toBytes("row1");
87    private static final byte[] ROW2 = Bytes.toBytes("row2");
88    private static final String FAMILYA_STRING = "a";
89    private static final String FAMILYB_STRING = "b";
90    private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
91    private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
92    private static final byte[] QUAL = Bytes.toBytes("q");
93    private static final String OUTPUT_DIR = "outputdir";
94    private static String FQ_OUTPUT_DIR;
95    private static final String EXPORT_BATCH_SIZE = "100";
96  
97    private static long now = System.currentTimeMillis();
98  
99    @BeforeClass
100   public static void beforeClass() throws Exception {
101     UTIL.startMiniCluster();
102     UTIL.startMiniMapReduceCluster();
103     FQ_OUTPUT_DIR =  new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
104   }
105 
106   @AfterClass
107   public static void afterClass() throws Exception {
108     UTIL.shutdownMiniMapReduceCluster();
109     UTIL.shutdownMiniCluster();
110   }
111 
112   @Before
113   @After
114   public void cleanup() throws Exception {
115     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
116     fs.delete(new Path(OUTPUT_DIR), true);
117   }
118 
119   /**
120    * Runs an export job with the specified command line args
121    * @param args
122    * @return true if job completed successfully
123    * @throws IOException
124    * @throws InterruptedException
125    * @throws ClassNotFoundException
126    */
127   boolean runExport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
128     // need to make a copy of the configuration because to make sure different temp dirs are used.
129     GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
130     Configuration conf = opts.getConfiguration();
131     args = opts.getRemainingArgs();
132     Job job = Export.createSubmittableJob(conf, args);
133     job.waitForCompletion(false);
134     return job.isSuccessful();
135   }
136 
137   /**
138    * Runs an import job with the specified command line args
139    * @param args
140    * @return true if job completed successfully
141    * @throws IOException
142    * @throws InterruptedException
143    * @throws ClassNotFoundException
144    */
145   boolean runImport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
146     // need to make a copy of the configuration because to make sure different temp dirs are used.
147     GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
148     Configuration conf = opts.getConfiguration();
149     args = opts.getRemainingArgs();
150     Job job = Import.createSubmittableJob(conf, args);
151     job.waitForCompletion(false);
152     return job.isSuccessful();
153   }
154 
155   /**
156    * Test simple replication case with column mapping
157    * @throws Exception
158    */
159   @Test
160   public void testSimpleCase() throws Exception {
161     String EXPORT_TABLE = "exportSimpleCase";
162     HTable t = UTIL.createTable(Bytes.toBytes(EXPORT_TABLE), FAMILYA, 3);
163     Put p = new Put(ROW1);
164     p.add(FAMILYA, QUAL, now, QUAL);
165     p.add(FAMILYA, QUAL, now+1, QUAL);
166     p.add(FAMILYA, QUAL, now+2, QUAL);
167     t.put(p);
168     p = new Put(ROW2);
169     p.add(FAMILYA, QUAL, now, QUAL);
170     p.add(FAMILYA, QUAL, now+1, QUAL);
171     p.add(FAMILYA, QUAL, now+2, QUAL);
172     t.put(p);
173 
174     String[] args = new String[] {
175         EXPORT_TABLE,
176         FQ_OUTPUT_DIR,
177         "1000", // max number of key versions per key to export
178     };
179     assertTrue(runExport(args));
180 
181     String IMPORT_TABLE = "importTableSimpleCase";
182     t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB, 3);
183     args = new String[] {
184         "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
185         IMPORT_TABLE,
186         FQ_OUTPUT_DIR
187     };
188     assertTrue(runImport(args));
189 
190     Get g = new Get(ROW1);
191     g.setMaxVersions();
192     Result r = t.get(g);
193     assertEquals(3, r.size());
194     g = new Get(ROW2);
195     g.setMaxVersions();
196     r = t.get(g);
197     assertEquals(3, r.size());
198   }
199 
200   /**
201    * Test export hbase:meta table
202    *
203    * @throws Exception
204    */
205   @Test
206   public void testMetaExport() throws Exception {
207     String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
208     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
209     assertTrue(runExport(args));
210   }
211 
212   /**
213    * Test import data from 0.94 exported file
214    * @throws Exception
215    */
216   @Test
217   public void testImport94Table() throws Exception {
218     URL url = TestImportExport.class.getResource(
219         "exportedTableIn94Format");
220     Path importPath = new Path(url.getPath());
221     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
222     fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR
223         + "exportedTableIn94Format"));
224     String IMPORT_TABLE = "importTableExportedFrom94";
225     HTable t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), Bytes.toBytes("f1"), 3);
226     String[] args = new String[] {
227         "-Dhbase.import.version=0.94" ,
228         IMPORT_TABLE, FQ_OUTPUT_DIR
229     };
230     assertTrue(runImport(args));
231 
232     /* exportedTableIn94Format contains 5 rows
233      ROW         COLUMN+CELL
234      r1          column=f1:c1, timestamp=1383766761171, value=val1
235      r2          column=f1:c1, timestamp=1383766771642, value=val2
236      r3          column=f1:c1, timestamp=1383766777615, value=val3
237      r4          column=f1:c1, timestamp=1383766785146, value=val4
238      r5          column=f1:c1, timestamp=1383766791506, value=val5
239      */
240     assertEquals(5, UTIL.countRows(t));
241     t.close();
242   }
243 
244   /**
245    * Test export scanner batching
246    */
247    @Test
248    public void testExportScannerBatching() throws Exception {
249     String BATCH_TABLE = "exportWithBatch";
250     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(BATCH_TABLE));
251     desc.addFamily(new HColumnDescriptor(FAMILYA)
252         .setMaxVersions(1)
253     );
254     UTIL.getHBaseAdmin().createTable(desc);
255     HTable t = new HTable(UTIL.getConfiguration(), BATCH_TABLE);
256 
257     Put p = new Put(ROW1);
258     p.add(FAMILYA, QUAL, now, QUAL);
259     p.add(FAMILYA, QUAL, now+1, QUAL);
260     p.add(FAMILYA, QUAL, now+2, QUAL);
261     p.add(FAMILYA, QUAL, now+3, QUAL);
262     p.add(FAMILYA, QUAL, now+4, QUAL);
263     t.put(p);
264 
265     String[] args = new String[] {
266         "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,  // added scanner batching arg.
267         BATCH_TABLE,
268         FQ_OUTPUT_DIR
269     };
270     assertTrue(runExport(args));
271 
272     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
273     fs.delete(new Path(FQ_OUTPUT_DIR), true);
274     t.close();
275   }
276 
277   @Test
278   public void testWithDeletes() throws Exception {
279     String EXPORT_TABLE = "exportWithDeletes";
280     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
281     desc.addFamily(new HColumnDescriptor(FAMILYA)
282         .setMaxVersions(5)
283         .setKeepDeletedCells(true)
284     );
285     UTIL.getHBaseAdmin().createTable(desc);
286     HTable t = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
287 
288     Put p = new Put(ROW1);
289     p.add(FAMILYA, QUAL, now, QUAL);
290     p.add(FAMILYA, QUAL, now+1, QUAL);
291     p.add(FAMILYA, QUAL, now+2, QUAL);
292     p.add(FAMILYA, QUAL, now+3, QUAL);
293     p.add(FAMILYA, QUAL, now+4, QUAL);
294     t.put(p);
295 
296     Delete d = new Delete(ROW1, now+3);
297     t.delete(d);
298     d = new Delete(ROW1);
299     d.deleteColumns(FAMILYA, QUAL, now+2);
300     t.delete(d);
301 
302     String[] args = new String[] {
303         "-D" + Export.RAW_SCAN + "=true",
304         EXPORT_TABLE,
305         FQ_OUTPUT_DIR,
306         "1000", // max number of key versions per key to export
307     };
308     assertTrue(runExport(args));
309 
310     String IMPORT_TABLE = "importWithDeletes";
311     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
312     desc.addFamily(new HColumnDescriptor(FAMILYA)
313         .setMaxVersions(5)
314         .setKeepDeletedCells(true)
315     );
316     UTIL.getHBaseAdmin().createTable(desc);
317     t.close();
318     t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
319     args = new String[] {
320         IMPORT_TABLE,
321         FQ_OUTPUT_DIR
322     };
323     assertTrue(runImport(args));
324 
325     Scan s = new Scan();
326     s.setMaxVersions();
327     s.setRaw(true);
328     ResultScanner scanner = t.getScanner(s);
329     Result r = scanner.next();
330     Cell[] res = r.rawCells();
331     assertTrue(CellUtil.isDeleteFamily(res[0]));
332     assertEquals(now+4, res[1].getTimestamp());
333     assertEquals(now+3, res[2].getTimestamp());
334     assertTrue(CellUtil.isDelete(res[3]));
335     assertEquals(now+2, res[4].getTimestamp());
336     assertEquals(now+1, res[5].getTimestamp());
337     assertEquals(now, res[6].getTimestamp());
338     t.close();
339   }
340   
341   
342   @Test
343   public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception {
344     String EXPORT_TABLE = "exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
345     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
346     desc.addFamily(new HColumnDescriptor(FAMILYA)
347         .setMaxVersions(5)
348         .setKeepDeletedCells(true)
349     );
350     UTIL.getHBaseAdmin().createTable(desc);
351     HTable exportT = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
352 
353     //Add first version of QUAL
354     Put p = new Put(ROW1);
355     p.add(FAMILYA, QUAL, now, QUAL);
356     exportT.put(p);
357 
358     //Add Delete family marker
359     Delete d = new Delete(ROW1, now+3);
360     exportT.delete(d);
361 
362     //Add second version of QUAL
363     p = new Put(ROW1);
364     p.add(FAMILYA, QUAL, now+5, "s".getBytes());
365     exportT.put(p);
366 
367     //Add second Delete family marker
368     d = new Delete(ROW1, now+7);
369     exportT.delete(d);
370     
371     
372     String[] args = new String[] {
373         "-D" + Export.RAW_SCAN + "=true",
374         EXPORT_TABLE,
375         FQ_OUTPUT_DIR,
376         "1000", // max number of key versions per key to export
377     };
378     assertTrue(runExport(args));
379 
380     String IMPORT_TABLE = "importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
381     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
382     desc.addFamily(new HColumnDescriptor(FAMILYA)
383         .setMaxVersions(5)
384         .setKeepDeletedCells(true)
385     );
386     UTIL.getHBaseAdmin().createTable(desc);
387     
388     HTable importT = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
389     args = new String[] {
390         IMPORT_TABLE,
391         FQ_OUTPUT_DIR
392     };
393     assertTrue(runImport(args));
394 
395     Scan s = new Scan();
396     s.setMaxVersions();
397     s.setRaw(true);
398     
399     ResultScanner importedTScanner = importT.getScanner(s);
400     Result importedTResult = importedTScanner.next();
401     
402     ResultScanner exportedTScanner = exportT.getScanner(s);
403     Result  exportedTResult =  exportedTScanner.next();
404     try
405     {
406       Result.compareResults(exportedTResult, importedTResult);
407     }
408     catch (Exception e) {
409       fail("Original and imported tables data comparision failed with error:"+e.getMessage());
410     }
411     finally
412     {
413       exportT.close();
414       importT.close();
415     }
416   }
417 
418   /**
419    * Create a simple table, run an Export Job on it, Import with filtering on,  verify counts,
420    * attempt with invalid values.
421    */
422   @Test
423   public void testWithFilter() throws Exception {
424     // Create simple table to export
425     String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
426     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
427     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
428     UTIL.getHBaseAdmin().createTable(desc);
429     HTable exportTable = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
430 
431     Put p = new Put(ROW1);
432     p.add(FAMILYA, QUAL, now, QUAL);
433     p.add(FAMILYA, QUAL, now + 1, QUAL);
434     p.add(FAMILYA, QUAL, now + 2, QUAL);
435     p.add(FAMILYA, QUAL, now + 3, QUAL);
436     p.add(FAMILYA, QUAL, now + 4, QUAL);
437     exportTable.put(p);
438 
439     // Having another row would actually test the filter.
440     p = new Put(ROW2);
441     p.add(FAMILYA, QUAL, now, QUAL);
442     exportTable.put(p);
443     // Flush the commits.
444     exportTable.flushCommits();
445 
446     // Export the simple table
447     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
448     assertTrue(runExport(args));
449 
450     // Import to a new table
451     String IMPORT_TABLE = "importWithFilter";
452     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
453     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
454     UTIL.getHBaseAdmin().createTable(desc);
455 
456     HTable importTable = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
457     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
458         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
459         "1000" };
460     assertTrue(runImport(args));
461 
462     // get the count of the source table for that time range
463     PrefixFilter filter = new PrefixFilter(ROW1);
464     int count = getCount(exportTable, filter);
465 
466     Assert.assertEquals("Unexpected row count between export and import tables", count,
467       getCount(importTable, null));
468 
469     // and then test that a broken command doesn't bork everything - easier here because we don't
470     // need to re-run the export job
471 
472     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
473         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
474         FQ_OUTPUT_DIR, "1000" };
475     assertFalse(runImport(args));
476 
477     // cleanup
478     exportTable.close();
479     importTable.close();
480   }
481 
482   /**
483    * Count the number of keyvalues in the specified table for the given timerange
484    * @param start
485    * @param end
486    * @param table
487    * @return
488    * @throws IOException
489    */
490   private int getCount(HTable table, Filter filter) throws IOException {
491     Scan scan = new Scan();
492     scan.setFilter(filter);
493     ResultScanner results = table.getScanner(scan);
494     int count = 0;
495     for (Result res : results) {
496       count += res.size();
497     }
498     results.close();
499     return count;
500   }
501   
502   /**
503    * test main method. Import should print help and call System.exit
504    */
505   @Test
506   public void testImportMain() throws Exception {
507     PrintStream oldPrintStream = System.err;
508     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
509     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
510     System.setSecurityManager(newSecurityManager);
511     ByteArrayOutputStream data = new ByteArrayOutputStream();
512     String[] args = {};
513     System.setErr(new PrintStream(data));
514     try {
515       System.setErr(new PrintStream(data));
516       Import.main(args);
517       fail("should be SecurityException");
518     } catch (SecurityException e) {
519       assertEquals(-1, newSecurityManager.getExitCode());
520       assertTrue(data.toString().contains("Wrong number of arguments:"));
521       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
522       assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
523       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
524       assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false"));
525     } finally {
526       System.setErr(oldPrintStream);
527       System.setSecurityManager(SECURITY_MANAGER);
528     }
529   }
530 
531   /**
532    * test main method. Export should print help and call System.exit
533    */
534   @Test
535   public void testExportMain() throws Exception {
536     PrintStream oldPrintStream = System.err;
537     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
538     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
539     System.setSecurityManager(newSecurityManager);
540     ByteArrayOutputStream data = new ByteArrayOutputStream();
541     String[] args = {};
542     System.setErr(new PrintStream(data));
543     try {
544       System.setErr(new PrintStream(data));
545       Export.main(args);
546       fail("should be SecurityException");
547     } catch (SecurityException e) {
548       assertEquals(-1, newSecurityManager.getExitCode());
549       assertTrue(data.toString().contains("Wrong number of arguments:"));
550       assertTrue(data.toString().contains(
551               "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
552               "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
553       assertTrue(data.toString().contains("-D hbase.mapreduce.scan.column.family=<familyName>"));
554       assertTrue(data.toString().contains("-D hbase.mapreduce.include.deleted.rows=true"));
555       assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
556       assertTrue(data.toString().contains("-Dmapred.map.tasks.speculative.execution=false"));
557       assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false"));
558       assertTrue(data.toString().contains("-Dhbase.export.scanner.batch=10"));
559     } finally {
560       System.setErr(oldPrintStream);
561       System.setSecurityManager(SECURITY_MANAGER);
562     }
563   }
564 
565   /**
566    * Test map method of Importer
567    */
568   @SuppressWarnings({ "unchecked", "rawtypes" })
569   @Test
570   public void testKeyValueImporter() throws Exception {
571     KeyValueImporter importer = new KeyValueImporter();
572     Configuration configuration = new Configuration();
573     Context ctx = mock(Context.class);
574     when(ctx.getConfiguration()).thenReturn(configuration);
575 
576     doAnswer(new Answer<Void>() {
577 
578       @Override
579       public Void answer(InvocationOnMock invocation) throws Throwable {
580         ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
581         KeyValue key = (KeyValue) invocation.getArguments()[1];
582         assertEquals("Key", Bytes.toString(writer.get()));
583         assertEquals("row", Bytes.toString(key.getRow()));
584         return null;
585       }
586     }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
587 
588     importer.setup(ctx);
589     Result value = mock(Result.class);
590     KeyValue[] keys = {
591         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
592             Bytes.toBytes("value")),
593         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
594             Bytes.toBytes("value1")) };
595     when(value.rawCells()).thenReturn(keys);
596     importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
597 
598   }
599 
600   /**
601    * Test addFilterAndArguments method of Import This method set couple
602    * parameters into Configuration
603    */
604   @Test
605   public void testAddFilterAndArguments() throws IOException {
606     Configuration configuration = new Configuration();
607 
608     List<String> args = new ArrayList<String>();
609     args.add("param1");
610     args.add("param2");
611 
612     Import.addFilterAndArguments(configuration, FilterBase.class, args);
613     assertEquals("org.apache.hadoop.hbase.filter.FilterBase", 
614         configuration.get(Import.FILTER_CLASS_CONF_KEY));
615     assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
616   }
617 
618   @Test
619   public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
620     // Create an export table.
621     String exportTableName = "exporttestDurability";
622     HTable exportTable = UTIL.createTable(Bytes.toBytes(exportTableName), FAMILYA, 3);
623 
624     // Insert some data
625     Put put = new Put(ROW1);
626     put.add(FAMILYA, QUAL, now, QUAL);
627     put.add(FAMILYA, QUAL, now + 1, QUAL);
628     put.add(FAMILYA, QUAL, now + 2, QUAL);
629     exportTable.put(put);
630 
631     put = new Put(ROW2);
632     put.add(FAMILYA, QUAL, now, QUAL);
633     put.add(FAMILYA, QUAL, now + 1, QUAL);
634     put.add(FAMILYA, QUAL, now + 2, QUAL);
635     exportTable.put(put);
636 
637     // Run the export
638     String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
639     assertTrue(runExport(args));
640 
641     // Create the table for import
642     String importTableName = "importTestDurability1";
643     HTable importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
644 
645     // Register the hlog listener for the import table
646     TableWALActionListener walListener = new TableWALActionListener(importTableName);
647     HLog hLog = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL();
648     hLog.registerWALActionsListener(walListener);
649 
650     // Run the import with SKIP_WAL
651     args =
652         new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
653             importTableName, FQ_OUTPUT_DIR };
654     assertTrue(runImport(args));
655     //Assert that the wal is not visisted
656     assertTrue(!walListener.isWALVisited());
657     //Ensure that the count is 2 (only one version of key value is obtained)
658     assertTrue(getCount(importTable, null) == 2);
659 
660     // Run the import with the default durability option
661     importTableName = "importTestDurability2";
662     importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
663     hLog.unregisterWALActionsListener(walListener);    
664     walListener = new TableWALActionListener(importTableName);
665     hLog.registerWALActionsListener(walListener);
666     args = new String[] { importTableName, FQ_OUTPUT_DIR };
667     assertTrue(runImport(args));
668     //Assert that the wal is visisted
669     assertTrue(walListener.isWALVisited());
670     //Ensure that the count is 2 (only one version of key value is obtained)
671     assertTrue(getCount(importTable, null) == 2);
672   }
673 
674   /**
675    * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, HLogKey, WALEdit)} to
676    * identify that an entry is written to the Write Ahead Log for the given table.
677    */
678   private static class TableWALActionListener implements WALActionsListener {
679 
680     private String tableName;
681     private boolean isVisited = false;
682 
683     public TableWALActionListener(String tableName) {
684       this.tableName = tableName;
685     }
686 
687     @Override
688     public void preLogRoll(Path oldPath, Path newPath) throws IOException {
689       // Not interested in this method.
690     }
691 
692     @Override
693     public void postLogRoll(Path oldPath, Path newPath) throws IOException {
694       // Not interested in this method.
695     }
696 
697     @Override
698     public void preLogArchive(Path oldPath, Path newPath) throws IOException {
699       // Not interested in this method.
700     }
701 
702     @Override
703     public void postLogArchive(Path oldPath, Path newPath) throws IOException {
704       // Not interested in this method.
705     }
706 
707     @Override
708     public void logRollRequested(boolean tooFewReplicas) {
709       // Not interested in this method.
710     }
711 
712     @Override
713     public void logCloseRequested() {
714       // Not interested in this method.
715     }
716 
717     @Override
718     public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
719       // Not interested in this method.
720     }
721 
722     @Override
723     public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
724       if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
725         isVisited = true;
726       }
727     }
728 
729     public boolean isWALVisited() {
730       return isVisited;
731     }
732   }  
733 }