View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FSDataInputStream;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.io.util.LRUDictionary;
34  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
35  import org.apache.hadoop.hbase.util.FSUtils;
36  
37  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
38  public abstract class ReaderBase implements HLog.Reader {
39    private static final Log LOG = LogFactory.getLog(ReaderBase.class);
40    protected Configuration conf;
41    protected FileSystem fs;
42    protected Path path;
43    protected long edit = 0;
44    protected long fileLength;
45    protected WALTrailer trailer;
46    // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
47    // than this size, it is written/read respectively, with a WARN message in the log.
48    protected int trailerWarnSize;
49    /**
50     * Compression context to use reading.  Can be null if no compression.
51     */
52    protected CompressionContext compressionContext = null;
53    protected boolean emptyCompressionContext = true;
54  
55    /**
56     * Default constructor.
57     */
58    public ReaderBase() {
59    }
60  
61    @Override
62    public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
63        throws IOException {
64      this.conf = conf;
65      this.path = path;
66      this.fs = fs;
67      this.fileLength = this.fs.getFileStatus(path).getLen();
68      this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
69        HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
70      String cellCodecClsName = initReader(stream);
71  
72      boolean compression = hasCompression();
73      if (compression) {
74        // If compression is enabled, new dictionaries are created here.
75        try {
76          if (compressionContext == null) {
77            compressionContext = new CompressionContext(LRUDictionary.class,
78                FSUtils.isRecoveredEdits(path), hasTagCompression());
79          } else {
80            compressionContext.clear();
81          }
82        } catch (Exception e) {
83          throw new IOException("Failed to initialize CompressionContext", e);
84        }
85      }
86      initAfterCompression(cellCodecClsName);
87    }
88  
89    @Override
90    public HLog.Entry next() throws IOException {
91      return next(null);
92    }
93  
94    @Override
95    public HLog.Entry next(HLog.Entry reuse) throws IOException {
96      HLog.Entry e = reuse;
97      if (e == null) {
98        e = new HLog.Entry(new HLogKey(), new WALEdit());
99      }
100     if (compressionContext != null) {
101       e.setCompressionContext(compressionContext);
102     }
103 
104     boolean hasEntry = false;
105     try {
106       hasEntry = readNext(e);
107     } catch (IllegalArgumentException iae) {
108       TableName tableName = e.getKey().getTablename();
109       if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
110         // It is old ROOT table edit, ignore it
111         LOG.info("Got an old ROOT edit, ignoring ");
112         return next(e);
113       }
114       else throw iae;
115     }
116     edit++;
117     if (compressionContext != null && emptyCompressionContext) {
118       emptyCompressionContext = false;
119     }
120     return hasEntry ? e : null;
121   }
122 
123   @Override
124   public void seek(long pos) throws IOException {
125     if (compressionContext != null && emptyCompressionContext) {
126       while (next() != null) {
127         if (getPosition() == pos) {
128           emptyCompressionContext = false;
129           break;
130         }
131       }
132     }
133     seekOnFs(pos);
134   }
135 
136   /**
137    * Initializes the log reader with a particular stream (may be null).
138    * Reader assumes ownership of the stream if not null and may use it. Called once.
139    * @return the class name of cell Codec, null if such information is not available
140    */
141   protected abstract String initReader(FSDataInputStream stream) throws IOException;
142 
143   /**
144    * Initializes the compression after the shared stuff has been initialized. Called once.
145    */
146   protected abstract void initAfterCompression() throws IOException;
147   
148   /**
149    * Initializes the compression after the shared stuff has been initialized. Called once.
150    * @param cellCodecClsName class name of cell Codec
151    */
152   protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
153   /**
154    * @return Whether compression is enabled for this log.
155    */
156   protected abstract boolean hasCompression();
157 
158   /**
159    * @return Whether tag compression is enabled for this log.
160    */
161   protected abstract boolean hasTagCompression();
162 
163   /**
164    * Read next entry.
165    * @param e The entry to read into.
166    * @return Whether there was anything to read.
167    */
168   protected abstract boolean readNext(HLog.Entry e) throws IOException;
169 
170   /**
171    * Performs a filesystem-level seek to a certain position in an underlying file.
172    */
173   protected abstract void seekOnFs(long pos) throws IOException;
174 
175   @Override
176   public WALTrailer getWALTrailer() {
177     return null;
178   }
179 }