View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  
24  import java.io.ByteArrayOutputStream;
25  import java.io.PrintStream;
26  import java.util.ArrayList;
27  
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.HBaseTestingUtility;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.testclassification.LargeTests;
36  import org.apache.hadoop.hbase.MiniHBaseCluster;
37  import org.apache.hadoop.hbase.client.Delete;
38  import org.apache.hadoop.hbase.client.Get;
39  import org.apache.hadoop.hbase.client.HTable;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43  import org.apache.hadoop.hbase.mapreduce.WALPlayer.HLogKeyValueMapper;
44  import org.apache.hadoop.hbase.regionserver.wal.HLog;
45  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
46  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
49  import org.apache.hadoop.mapreduce.Mapper;
50  import org.apache.hadoop.mapreduce.Mapper.Context;
51  import org.junit.AfterClass;
52  import org.junit.BeforeClass;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  import org.mockito.invocation.InvocationOnMock;
56  import org.mockito.stubbing.Answer;
57  
58  import static org.mockito.Matchers.any;
59  import static org.mockito.Mockito.*;
60  
61  /**
62   * Basic test for the WALPlayer M/R tool
63   */
64  @Category(LargeTests.class)
65  public class TestWALPlayer {
66    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67    private static MiniHBaseCluster cluster;
68  
69    @BeforeClass
70    public static void beforeClass() throws Exception {
71      cluster = TEST_UTIL.startMiniCluster();
72      TEST_UTIL.startMiniMapReduceCluster();
73    }
74  
75    @AfterClass
76    public static void afterClass() throws Exception {
77      TEST_UTIL.shutdownMiniMapReduceCluster();
78      TEST_UTIL.shutdownMiniCluster();
79    }
80  
81    /**
82     * Simple end-to-end test
83     * @throws Exception
84     */
85    @Test
86    public void testWALPlayer() throws Exception {
87      final byte[] TABLENAME1 = Bytes.toBytes("testWALPlayer1");
88      final byte[] TABLENAME2 = Bytes.toBytes("testWALPlayer2");
89      final byte[] FAMILY = Bytes.toBytes("family");
90      final byte[] COLUMN1 = Bytes.toBytes("c1");
91      final byte[] COLUMN2 = Bytes.toBytes("c2");
92      final byte[] ROW = Bytes.toBytes("row");
93      HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
94      HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
95  
96      // put a row into the first table
97      Put p = new Put(ROW);
98      p.add(FAMILY, COLUMN1, COLUMN1);
99      p.add(FAMILY, COLUMN2, COLUMN2);
100     t1.put(p);
101     // delete one column
102     Delete d = new Delete(ROW);
103     d.deleteColumns(FAMILY, COLUMN1);
104     t1.delete(d);
105 
106     // replay the WAL, map table 1 to table 2
107     HLog log = cluster.getRegionServer(0).getWAL();
108     log.rollWriter();
109     String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
110         .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
111 
112     Configuration configuration= TEST_UTIL.getConfiguration();
113     WALPlayer player = new WALPlayer(configuration);
114     String optionName="_test_.name";
115     configuration.set(optionName, "1000");
116     player.setupTime(configuration, optionName);
117     assertEquals(1000,configuration.getLong(optionName,0));
118     assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1),
119         Bytes.toString(TABLENAME2) }));
120 
121     
122     // verify the WAL was player into table 2
123     Get g = new Get(ROW);
124     Result r = t2.get(g);
125     assertEquals(1, r.size());
126     assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
127   }
128 
129   /**
130    * Test HLogKeyValueMapper setup and map
131    */
132   @Test
133   public void testHLogKeyValueMapper() throws Exception {
134     Configuration configuration = new Configuration();
135     configuration.set(WALPlayer.TABLES_KEY, "table");
136     HLogKeyValueMapper mapper = new HLogKeyValueMapper();
137     HLogKey key = mock(HLogKey.class);
138     when(key.getTablename()).thenReturn(TableName.valueOf("table"));
139     @SuppressWarnings("unchecked")
140     Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
141         mock(Context.class);
142     when(context.getConfiguration()).thenReturn(configuration);
143 
144     WALEdit value = mock(WALEdit.class);
145     ArrayList<KeyValue> values = new ArrayList<KeyValue>();
146     KeyValue kv1 = mock(KeyValue.class);
147     when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
148     when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));
149     values.add(kv1);
150     when(value.getKeyValues()).thenReturn(values);
151     mapper.setup(context);
152 
153     doAnswer(new Answer<Void>() {
154 
155       @Override
156       public Void answer(InvocationOnMock invocation) throws Throwable {
157         ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
158         KeyValue key = (KeyValue) invocation.getArguments()[1];
159         assertEquals("row", Bytes.toString(writer.get()));
160         assertEquals("row", Bytes.toString(key.getRow()));
161         return null;
162       }
163     }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
164 
165     mapper.map(key, value, context);
166 
167   }
168 
169   /**
170    * Test main method
171    */
172   @Test
173   public void testMainMethod() throws Exception {
174 
175     PrintStream oldPrintStream = System.err;
176     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
177     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
178     System.setSecurityManager(newSecurityManager);
179     ByteArrayOutputStream data = new ByteArrayOutputStream();
180     String[] args = {};
181     System.setErr(new PrintStream(data));
182     try {
183       System.setErr(new PrintStream(data));
184       try {
185         WALPlayer.main(args);
186         fail("should be SecurityException");
187       } catch (SecurityException e) {
188         assertEquals(-1, newSecurityManager.getExitCode());
189         assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
190         assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
191             " <tables> [<tableMappings>]"));
192         assertTrue(data.toString().contains("-Dhlog.bulk.output=/path/for/output"));
193       }
194 
195     } finally {
196       System.setErr(oldPrintStream);
197       System.setSecurityManager(SECURITY_MANAGER);
198     }
199 
200   }
201 
202 }