View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.IOException;
23  import java.lang.reflect.Field;
24  import java.lang.reflect.InvocationTargetException;
25  import java.util.TreeMap;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
36  import org.apache.hadoop.hbase.util.FSUtils;
37  import org.apache.hadoop.io.SequenceFile;
38  import org.apache.hadoop.io.SequenceFile.CompressionType;
39  import org.apache.hadoop.io.SequenceFile.Metadata;
40  import org.apache.hadoop.io.Text;
41  import org.apache.hadoop.io.compress.CompressionCodec;
42  import org.apache.hadoop.io.compress.DefaultCodec;
43  
44  /**
45   * Implementation of {@link HLog.Writer} that delegates to
46   * SequenceFile.Writer. Legacy implementation only used for compat tests.
47   */
48  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
49  public class SequenceFileLogWriter extends WriterBase {
50    private final Log LOG = LogFactory.getLog(this.getClass());
51    // The sequence file we delegate to.
52    private SequenceFile.Writer writer;
53    // This is the FSDataOutputStream instance that is the 'out' instance
54    // in the SequenceFile.Writer 'writer' instance above.
55    private FSDataOutputStream writer_out;
56  
57    // Legacy stuff from pre-PB WAL metadata.
58    private static final Text WAL_VERSION_KEY = new Text("version");
59    private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
60    private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
61  
62    /**
63     * Default constructor.
64     */
65    public SequenceFileLogWriter() {
66      super();
67    }
68    /**
69     * Create sequence file Metadata for our WAL file with version and compression
70     * type (if any).
71     * @param conf
72     * @param compress
73     * @return Metadata instance.
74     */
75    private static Metadata createMetadata(final Configuration conf,
76        final boolean compress) {
77      TreeMap<Text, Text> metaMap = new TreeMap<Text, Text>();
78      metaMap.put(WAL_VERSION_KEY, new Text("1"));
79      if (compress) {
80        // Currently we only do one compression type.
81        metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
82      }
83      return new Metadata(metaMap);
84    }
85  
86    @Override
87    public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
88    throws IOException {
89      super.init(fs, path, conf, overwritable);
90      boolean compress = initializeCompressionContext(conf, path);
91      // Create a SF.Writer instance.
92      try {
93        // reflection for a version of SequenceFile.createWriter that doesn't
94        // automatically create the parent directory (see HBASE-2312)
95        this.writer = (SequenceFile.Writer) SequenceFile.class
96          .getMethod("createWriter", new Class[] {FileSystem.class,
97              Configuration.class, Path.class, Class.class, Class.class,
98              Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
99              CompressionType.class, CompressionCodec.class, Metadata.class})
100         .invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class,
101             Integer.valueOf(FSUtils.getDefaultBufferSize(fs)),
102             Short.valueOf((short)
103               conf.getInt("hbase.regionserver.hlog.replication",
104               FSUtils.getDefaultReplication(fs, path))),
105             Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
106                 FSUtils.getDefaultBlockSize(fs, path))),
107             Boolean.valueOf(false) /*createParent*/,
108             SequenceFile.CompressionType.NONE, new DefaultCodec(),
109             createMetadata(conf, compress)
110             });
111     } catch (InvocationTargetException ite) {
112       // function was properly called, but threw it's own exception
113       throw new IOException(ite.getCause());
114     } catch (Exception e) {
115       // ignore all other exceptions. related to reflection failure
116     }
117 
118     // if reflection failed, use the old createWriter
119     if (this.writer == null) {
120       LOG.debug("new createWriter -- HADOOP-6840 -- not available");
121       this.writer = SequenceFile.createWriter(fs, conf, path,
122         HLogKey.class, WALEdit.class,
123         FSUtils.getDefaultBufferSize(fs),
124         (short) conf.getInt("hbase.regionserver.hlog.replication",
125           FSUtils.getDefaultReplication(fs, path)),
126         conf.getLong("hbase.regionserver.hlog.blocksize",
127           FSUtils.getDefaultBlockSize(fs, path)),
128         SequenceFile.CompressionType.NONE,
129         new DefaultCodec(),
130         null,
131         createMetadata(conf, compress));
132     } else {
133       if (LOG.isTraceEnabled()) LOG.trace("Using new createWriter -- HADOOP-6840");
134     }
135     
136     this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible();
137     if (LOG.isTraceEnabled()) LOG.trace("Path=" + path + ", compression=" + compress);
138   }
139 
140   // Get at the private FSDataOutputStream inside in SequenceFile so we can
141   // call sync on it.  Make it accessible.
142   private FSDataOutputStream getSequenceFilePrivateFSDataOutputStreamAccessible()
143   throws IOException {
144     FSDataOutputStream out = null;
145     final Field fields [] = this.writer.getClass().getDeclaredFields();
146     final String fieldName = "out";
147     for (int i = 0; i < fields.length; ++i) {
148       if (fieldName.equals(fields[i].getName())) {
149         try {
150           // Make the 'out' field up in SF.Writer accessible.
151           fields[i].setAccessible(true);
152           out = (FSDataOutputStream)fields[i].get(this.writer);
153           break;
154         } catch (IllegalAccessException ex) {
155           throw new IOException("Accessing " + fieldName, ex);
156         } catch (SecurityException e) {
157           LOG.warn("Does not have access to out field from FSDataOutputStream",
158               e);
159         }
160       }
161     }
162     return out;
163   }
164 
165   @Override
166   public void append(HLog.Entry entry) throws IOException {
167     entry.setCompressionContext(compressionContext);
168     try {
169       this.writer.append(entry.getKey(), entry.getEdit());
170     } catch (NullPointerException npe) {
171       // Concurrent close...
172       throw new IOException(npe);
173     }
174   }
175 
176   @Override
177   public void close() throws IOException {
178     if (this.writer != null) {
179       try {
180         this.writer.close();
181       } catch (NullPointerException npe) {
182         // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
183         LOG.warn(npe);
184       }
185       this.writer = null;
186     }
187   }
188 
189   @Override
190   public void sync() throws IOException {
191     try {
192       this.writer.syncFs();
193     } catch (NullPointerException npe) {
194       // Concurrent close...
195       throw new IOException(npe);
196     }
197   }
198 
199   @Override
200   public long getLength() throws IOException {
201     try {
202       return this.writer.getLength();
203     } catch (NullPointerException npe) {
204       // Concurrent close...
205       throw new IOException(npe);
206     }
207   }
208 
209   /**
210    * @return The dfsclient out stream up inside SF.Writer made accessible, or
211    * null if not available.
212    */
213   public FSDataOutputStream getWriterFSDataOutputStream() {
214     return this.writer_out;
215   }
216 
217   /**
218    * This method is empty as trailer is added only in Protobuf based hlog readers/writers.
219    */
220   @Override
221   public void setWALTrailer(WALTrailer walTrailer) {
222   }
223 }