View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.ByteArrayOutputStream;
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.security.SecureRandom;
27  
28  import org.apache.commons.io.IOUtils;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.codec.KeyValueCodec;
34  import org.apache.hadoop.hbase.io.crypto.Decryptor;
35  import org.apache.hadoop.hbase.io.crypto.Encryption;
36  import org.apache.hadoop.hbase.io.crypto.Encryptor;
37  import org.apache.hadoop.hbase.io.util.StreamUtils;
38  import org.apache.hadoop.hbase.util.Bytes;
39  
40  /**
41   * A WALCellCodec that encrypts the WALedits.
42   */
43  @InterfaceAudience.Private
44  public class SecureWALCellCodec extends WALCellCodec {
45  
46    private Encryptor encryptor;
47    private Decryptor decryptor;
48  
49    public SecureWALCellCodec(Configuration conf, CompressionContext compression) {
50      super(conf, compression);
51    }
52  
53    public SecureWALCellCodec(Configuration conf, Encryptor encryptor) {
54      super(conf, null);
55      this.encryptor = encryptor;
56    }
57  
58    public SecureWALCellCodec(Configuration conf, Decryptor decryptor) {
59      super(conf, null);
60      this.decryptor = decryptor;
61    }
62  
63    static class EncryptedKvDecoder extends KeyValueCodec.KeyValueDecoder {
64  
65      private Decryptor decryptor;
66      private byte[] iv;
67  
68      public EncryptedKvDecoder(InputStream in) {
69        super(in);
70      }
71  
72      public EncryptedKvDecoder(InputStream in, Decryptor decryptor) {
73        super(in);
74        this.decryptor = decryptor;
75        if (decryptor != null) {
76          this.iv = new byte[decryptor.getIvLength()];
77        }
78      }
79  
80      @Override
81      protected Cell parseCell() throws IOException {
82        if (this.decryptor == null) {
83          return super.parseCell();
84        }
85        int ivLength = 0;
86        try {
87          ivLength = StreamUtils.readRawVarint32(in);
88        } catch (EOFException e) {
89          // EOF at start is OK
90          return null;
91        }
92  
93        // TODO: An IV length of 0 could signify an unwrapped cell, when the
94        // encoder supports that just read the remainder in directly
95  
96        if (ivLength != this.iv.length) {
97          throw new IOException("Incorrect IV length: expected=" + iv.length + " have=" +
98            ivLength);
99        }
100       IOUtils.readFully(in, this.iv);
101 
102       int codedLength = StreamUtils.readRawVarint32(in);
103       byte[] codedBytes = new byte[codedLength];
104       IOUtils.readFully(in, codedBytes);
105 
106       decryptor.setIv(iv);
107       decryptor.reset();
108 
109       InputStream cin = decryptor.createDecryptionStream(new ByteArrayInputStream(codedBytes));
110 
111       // TODO: Add support for WAL compression
112 
113       int keylength = StreamUtils.readRawVarint32(cin);
114       int vlength = StreamUtils.readRawVarint32(cin);
115       int tagsLength = StreamUtils.readRawVarint32(cin);
116       int length = 0;
117       if (tagsLength == 0) {
118         length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
119       } else {
120         length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
121       }
122 
123       byte[] backingArray = new byte[length];
124       int pos = 0;
125       pos = Bytes.putInt(backingArray, pos, keylength);
126       pos = Bytes.putInt(backingArray, pos, vlength);
127 
128       // Row
129       int elemLen = StreamUtils.readRawVarint32(cin);
130       pos = Bytes.putShort(backingArray, pos, (short)elemLen);
131       IOUtils.readFully(cin, backingArray, pos, elemLen);
132       pos += elemLen;
133       // Family
134       elemLen = StreamUtils.readRawVarint32(cin);
135       pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
136       IOUtils.readFully(cin, backingArray, pos, elemLen);
137       pos += elemLen;
138       // Qualifier
139       elemLen = StreamUtils.readRawVarint32(cin);
140       IOUtils.readFully(cin, backingArray, pos, elemLen);
141       pos += elemLen;
142       // Remainder
143       IOUtils.readFully(cin, backingArray, pos, length - pos);
144       return new KeyValue(backingArray, 0, length);
145     }
146 
147   }
148 
149   static class EncryptedKvEncoder extends KeyValueCodec.KeyValueEncoder {
150 
151     private Encryptor encryptor;
152     private final ThreadLocal<byte[]> iv = new ThreadLocal<byte[]>() {
153       @Override
154       protected byte[] initialValue() {
155         byte[] iv = new byte[encryptor.getIvLength()];
156         new SecureRandom().nextBytes(iv);
157         return iv;
158       }
159     };
160 
161     protected byte[] nextIv() {
162       byte[] b = iv.get(), ret = new byte[b.length];
163       System.arraycopy(b, 0, ret, 0, b.length);
164       return ret;
165     }
166 
167     protected void incrementIv(int v) {
168       Encryption.incrementIv(iv.get(), 1 + (v / encryptor.getBlockSize()));
169     }
170 
171     public EncryptedKvEncoder(OutputStream os) {
172       super(os);
173     }
174 
175     public EncryptedKvEncoder(OutputStream os, Encryptor encryptor) {
176       super(os);
177       this.encryptor = encryptor;
178     }
179 
180     @Override
181     public void write(Cell cell) throws IOException {
182       if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
183       if (encryptor == null) {
184         super.write(cell);
185         return;
186       }
187 
188       KeyValue kv = (KeyValue)cell;
189       byte[] kvBuffer = kv.getBuffer();
190       int offset = kv.getOffset();
191 
192       byte[] iv = nextIv();
193       encryptor.setIv(iv);
194       encryptor.reset();
195 
196       // TODO: Check if this is a cell for an encrypted CF. If not, we can
197       // write a 0 here to signal an unwrapped cell and just dump the KV bytes
198       // afterward
199 
200       StreamUtils.writeRawVInt32(out, iv.length);
201       out.write(iv);
202 
203       // TODO: Add support for WAL compression
204 
205       ByteArrayOutputStream baos = new ByteArrayOutputStream();
206       OutputStream cout = encryptor.createEncryptionStream(baos);
207 
208       // Write the KeyValue infrastructure as VInts.
209       StreamUtils.writeRawVInt32(cout, kv.getKeyLength());
210       StreamUtils.writeRawVInt32(cout, kv.getValueLength());
211       // To support tags
212       StreamUtils.writeRawVInt32(cout, kv.getTagsLengthUnsigned());
213 
214       // Write row, qualifier, and family
215       StreamUtils.writeRawVInt32(cout, kv.getRowLength());
216       cout.write(kvBuffer, kv.getRowOffset(), kv.getRowLength());
217       StreamUtils.writeRawVInt32(cout, kv.getFamilyLength());
218       cout.write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength());
219       StreamUtils.writeRawVInt32(cout, kv.getQualifierLength());
220       cout.write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength());
221       // Write the rest
222       int pos = kv.getTimestampOffset();
223       int remainingLength = kv.getLength() + offset - pos;
224       cout.write(kvBuffer, pos, remainingLength);
225       cout.close();
226 
227       StreamUtils.writeRawVInt32(out, baos.size());
228       baos.writeTo(out);
229 
230       // Increment IV given the final payload length
231       incrementIv(baos.size());
232     }
233 
234   }
235 
236   @Override
237   public Decoder getDecoder(InputStream is) {
238     return new EncryptedKvDecoder(is, decryptor);
239   }
240 
241   @Override
242   public Encoder getEncoder(OutputStream os) {
243     return new EncryptedKvEncoder(os, encryptor);
244   }
245 
246   public static WALCellCodec getCodec(Configuration conf, Encryptor encryptor) {
247     return new SecureWALCellCodec(conf, encryptor);
248   }
249 
250   public static WALCellCodec getCodec(Configuration conf, Decryptor decryptor) {
251     return new SecureWALCellCodec(conf, decryptor);
252   }
253 
254 }