1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.commons.io.IOUtils;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.commons.logging.impl.Log4JLogger;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataInputStream;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HColumnDescriptor;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.HRegionInfo;
39 import org.apache.hadoop.hbase.HTableDescriptor;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.testclassification.MediumTests;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
44 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.FSUtils;
47 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
48 import org.apache.log4j.Level;
49 import org.junit.BeforeClass;
50 import org.junit.Test;
51 import org.junit.experimental.categories.Category;
52
53
54
55
56 @Category(MediumTests.class)
57 public class TestHLogReaderOnSecureHLog {
58 static final Log LOG = LogFactory.getLog(TestHLogReaderOnSecureHLog.class);
59 static {
60 ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal"))
61 .getLogger().setLevel(Level.ALL);
62 };
63 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
64 final byte[] value = Bytes.toBytes("Test value");
65
66 @BeforeClass
67 public static void setUpBeforeClass() throws Exception {
68 Configuration conf = TEST_UTIL.getConfiguration();
69 conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
70 conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
71 conf.setBoolean("hbase.hlog.split.skip.errors", true);
72 conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
73 }
74
75 private Path writeWAL(String tblName, boolean encrypt) throws IOException {
76 Configuration conf = TEST_UTIL.getConfiguration();
77 String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
78 conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
79 WALCellCodec.class);
80 if (encrypt) {
81 conf.set("hbase.regionserver.wal.encryption", "true");
82 } else {
83 conf.set("hbase.regionserver.wal.encryption", "false");
84 }
85 TableName tableName = TableName.valueOf(tblName);
86 HTableDescriptor htd = new HTableDescriptor(tableName);
87 htd.addFamily(new HColumnDescriptor(tableName.getName()));
88 HRegionInfo regioninfo = new HRegionInfo(tableName,
89 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
90 final int total = 10;
91 final byte[] row = Bytes.toBytes("row");
92 final byte[] family = Bytes.toBytes("family");
93 FileSystem fs = TEST_UTIL.getTestFileSystem();
94 Path logDir = TEST_UTIL.getDataTestDir(tblName);
95 final AtomicLong sequenceId = new AtomicLong(1);
96
97
98 FSHLog wal = new FSHLog(fs, TEST_UTIL.getDataTestDir(), logDir.toString(), conf);
99 for (int i = 0; i < total; i++) {
100 WALEdit kvs = new WALEdit();
101 kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
102 wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
103 }
104 final Path walPath = ((FSHLog) wal).computeFilename();
105 wal.close();
106
107 conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
108
109 return walPath;
110 }
111
112 @Test()
113 public void testHLogReaderOnSecureHLog() throws Exception {
114 Configuration conf = TEST_UTIL.getConfiguration();
115 HLogFactory.resetLogReaderClass();
116 HLogFactory.resetLogWriterClass();
117 conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
118 HLog.Reader.class);
119 conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
120 HLog.Writer.class);
121 FileSystem fs = TEST_UTIL.getTestFileSystem();
122 Path walPath = writeWAL("testHLogReaderOnSecureHLog", true);
123
124
125 long length = fs.getFileStatus(walPath).getLen();
126 FSDataInputStream in = fs.open(walPath);
127 byte[] fileData = new byte[(int)length];
128 IOUtils.readFully(in, fileData);
129 in.close();
130 assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
131
132
133 try {
134 HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath, conf);
135 assertFalse(true);
136 } catch (IOException ioe) {
137
138 }
139
140 FileStatus[] listStatus = fs.listStatus(walPath.getParent());
141 RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
142 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
143 Path rootdir = FSUtils.getRootDir(conf);
144 try {
145 HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode);
146 s.splitLogFile(listStatus[0], null);
147 Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
148 "corrupt");
149 assertTrue(fs.exists(file));
150
151 } catch (IOException ioe) {
152 assertTrue("WAL should have been sidelined", false);
153 }
154 }
155
156 @Test()
157 public void testSecureHLogReaderOnHLog() throws Exception {
158 Configuration conf = TEST_UTIL.getConfiguration();
159 HLogFactory.resetLogReaderClass();
160 HLogFactory.resetLogWriterClass();
161 conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
162 HLog.Reader.class);
163 conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
164 HLog.Writer.class);
165 FileSystem fs = TEST_UTIL.getTestFileSystem();
166 Path walPath = writeWAL("testSecureHLogReaderOnHLog", false);
167
168
169 long length = fs.getFileStatus(walPath).getLen();
170 FSDataInputStream in = fs.open(walPath);
171 byte[] fileData = new byte[(int)length];
172 IOUtils.readFully(in, fileData);
173 in.close();
174 assertTrue("Cells should be plaintext", Bytes.contains(fileData, value));
175
176
177 try {
178 HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath, conf);
179 } catch (IOException ioe) {
180 assertFalse(true);
181 }
182
183 FileStatus[] listStatus = fs.listStatus(walPath.getParent());
184 RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
185 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
186 Path rootdir = FSUtils.getRootDir(conf);
187 try {
188 HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode);
189 s.splitLogFile(listStatus[0], null);
190 Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
191 "corrupt");
192 assertTrue(!fs.exists(file));
193 } catch (IOException ioe) {
194 assertTrue("WAL should have been processed", false);
195 }
196 }
197 }