View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.nio.ByteBuffer;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.List;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FSDataInputStream;
35  import org.apache.hadoop.hbase.codec.Codec;
36  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
37  import org.apache.hadoop.hbase.io.LimitInputStream;
38  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
39  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
40  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
41  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
42  import org.apache.hadoop.hbase.util.Bytes;
43  
44  import com.google.protobuf.CodedInputStream;
45  import com.google.protobuf.InvalidProtocolBufferException;
46  
47  /**
48   * A Protobuf based WAL has the following structure:
49   * <p>
50   * &lt;PB_WAL_MAGIC&gt;&lt;WALHeader&gt;&lt;WALEdits&gt;...&lt;WALEdits&gt;&lt;Trailer&gt;
51   * &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt;
52   * </p>
53   * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
54   * {@link ProtobufLogReader#initReader(FSDataInputStream)}. A WALTrailer is an extensible structure
55   * which is appended at the end of the WAL. This is empty for now; it can contain some meta
56   * information such as Region level stats, etc in future.
57   */
58  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
59  public class ProtobufLogReader extends ReaderBase {
60    private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
61    static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
62    static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
63    protected FSDataInputStream inputStream;
64    protected Codec.Decoder cellDecoder;
65    protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
66    protected boolean hasCompression = false;
67    protected boolean hasTagCompression = false;
68    // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry
69    // in the hlog, the inputstream's position is equal to walEditsStopOffset.
70    private long walEditsStopOffset;
71    private boolean trailerPresent;
72    private static List<String> writerClsNames = new ArrayList<String>();
73    static {
74      writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
75    }
76  
77    enum WALHdrResult {
78      EOF,                   // stream is at EOF when method starts
79      SUCCESS,
80      UNKNOWN_WRITER_CLS     // name of writer class isn't recognized
81    }
82    
83    // context for WALHdr carrying information such as Cell Codec classname
84    static class WALHdrContext {
85      WALHdrResult result;
86      String cellCodecClsName;
87      
88      WALHdrContext(WALHdrResult result, String cellCodecClsName) {
89        this.result = result;
90        this.cellCodecClsName = cellCodecClsName;
91      }
92      WALHdrResult getResult() {
93        return result;
94      }
95      String getCellCodecClsName() {
96        return cellCodecClsName;
97      }
98    }
99  
100   public ProtobufLogReader() {
101     super();
102   }
103 
104   @Override
105   public void close() throws IOException {
106     if (this.inputStream != null) {
107       this.inputStream.close();
108       this.inputStream = null;
109     }
110   }
111 
112   @Override
113   public long getPosition() throws IOException {
114     return inputStream.getPos();
115   }
116 
117   @Override
118   public void reset() throws IOException {
119     String clsName = initInternal(null, false);
120     initAfterCompression(clsName); // We need a new decoder (at least).
121   }
122 
123   @Override
124   protected String initReader(FSDataInputStream stream) throws IOException {
125     return initInternal(stream, true);
126   }
127 
128   /*
129    * Returns names of the accepted writer classes
130    */
131   protected List<String> getWriterClsNames() {
132     return writerClsNames;
133   }
134 
135   protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream)
136       throws IOException {
137      boolean res = builder.mergeDelimitedFrom(stream);
138      if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
139      if (builder.hasWriterClsName() &&
140          !getWriterClsNames().contains(builder.getWriterClsName())) {
141        return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
142      }
143      String clsName = null;
144      if (builder.hasCellCodecClsName()) {
145        clsName = builder.getCellCodecClsName();
146      }
147      return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
148   }
149 
150   private String initInternal(FSDataInputStream stream, boolean isFirst)
151       throws IOException {
152     close();
153     long expectedPos = PB_WAL_MAGIC.length;
154     if (stream == null) {
155       stream = fs.open(path);
156       stream.seek(expectedPos);
157     }
158     if (stream.getPos() != expectedPos) {
159       throw new IOException("The stream is at invalid position: " + stream.getPos());
160     }
161     // Initialize metadata or, when we reset, just skip the header.
162     WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
163     WALHdrContext hdrCtxt = readHeader(builder, stream);
164     WALHdrResult walHdrRes = hdrCtxt.getResult();
165     if (walHdrRes == WALHdrResult.EOF) {
166       throw new EOFException("Couldn't read WAL PB header");
167     }
168     if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
169       throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
170     }
171     if (isFirst) {
172       WALProtos.WALHeader header = builder.build();
173       this.hasCompression = header.hasHasCompression() && header.getHasCompression();
174       this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
175     }
176     this.inputStream = stream;
177     this.walEditsStopOffset = this.fileLength;
178     long currentPosition = stream.getPos();
179     trailerPresent = setTrailerIfPresent();
180     this.seekOnFs(currentPosition);
181     if (LOG.isTraceEnabled()) {
182       LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
183           + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
184     }
185     return hdrCtxt.getCellCodecClsName();
186   }
187 
188   /**
189    * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
190    * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
191    * the trailer, and checks whether the trailer is present at the end or not by comparing the last
192    * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
193    * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
194    * before the trailer.
195    * <ul>
196    * The trailer is ignored in case:
197    * <li>fileLength is 0 or not correct (when file is under recovery, etc).
198    * <li>the trailer size is negative.
199    * </ul>
200    * <p>
201    * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
202    * @return true if a valid trailer is present
203    * @throws IOException
204    */
205   private boolean setTrailerIfPresent() {
206     try {
207       long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
208       if (trailerSizeOffset <= 0) return false;// no trailer possible.
209       this.seekOnFs(trailerSizeOffset);
210       // read the int as trailer size.
211       int trailerSize = this.inputStream.readInt();
212       ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
213       this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
214       if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
215         LOG.trace("No trailer found.");
216         return false;
217       }
218       if (trailerSize < 0) {
219         LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
220         return false;
221       } else if (trailerSize > this.trailerWarnSize) {
222         // continue reading after warning the user.
223         LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
224           + trailerSize + " > " + this.trailerWarnSize);
225       }
226       // seek to the position where trailer starts.
227       long positionOfTrailer = trailerSizeOffset - trailerSize;
228       this.seekOnFs(positionOfTrailer);
229       // read the trailer.
230       buf = ByteBuffer.allocate(trailerSize);// for trailer.
231       this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
232       trailer = WALTrailer.parseFrom(buf.array());
233       this.walEditsStopOffset = positionOfTrailer;
234       return true;
235     } catch (IOException ioe) {
236       LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
237     }
238     return false;
239   }
240 
241   protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
242       CompressionContext compressionContext) throws IOException {
243     return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
244   }
245 
246   @Override
247   protected void initAfterCompression() throws IOException {
248     initAfterCompression(null);
249   }
250   
251   @Override
252   protected void initAfterCompression(String cellCodecClsName) throws IOException {
253     WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
254     this.cellDecoder = codec.getDecoder(this.inputStream);
255     if (this.hasCompression) {
256       this.byteStringUncompressor = codec.getByteStringUncompressor();
257     }
258   }
259 
260   @Override
261   protected boolean hasCompression() {
262     return this.hasCompression;
263   }
264 
265   @Override
266   protected boolean hasTagCompression() {
267     return this.hasTagCompression;
268   }
269 
270   @Override
271   protected boolean readNext(HLog.Entry entry) throws IOException {
272     while (true) {
273       // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
274       long originalPosition = this.inputStream.getPos();
275       if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
276         return false;
277       }
278       WALKey.Builder builder = WALKey.newBuilder();
279       long size = 0;
280       try {
281         long available = -1;
282         try {
283           int firstByte = this.inputStream.read();
284           if (firstByte == -1) {
285             throw new EOFException("First byte is negative");
286           }
287           size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
288           // available may be < 0 on local fs for instance.  If so, can't depend on it.
289           available = this.inputStream.available();
290           if (available > 0 && available < size) {
291             throw new EOFException("Available stream not enough for edit, " +
292                 "inputStream.available()= " + this.inputStream.available() + ", " +
293                 "entry size= " + size);
294           }
295           final InputStream limitedInput = new LimitInputStream(this.inputStream, size);
296           builder.mergeFrom(limitedInput);
297         } catch (InvalidProtocolBufferException ipbe) {
298           throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
299             originalPosition + ", currentPosition=" + this.inputStream.getPos() +
300             ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
301         }
302         if (!builder.isInitialized()) {
303           // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
304           //       If we can get the KV count, we could, theoretically, try to get next record.
305           throw new EOFException("Partial PB while reading WAL, " +
306               "probably an unexpected EOF, ignoring");
307         }
308         WALKey walKey = builder.build();
309         entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
310         if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
311           LOG.trace("WALKey has no KVs that follow it; trying the next one");
312           continue;
313         }
314         int expectedCells = walKey.getFollowingKvCount();
315         long posBefore = this.inputStream.getPos();
316         try {
317           int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
318           if (expectedCells != actualCells) {
319             throw new EOFException("Only read " + actualCells); // other info added in catch
320           }
321         } catch (Exception ex) {
322           String posAfterStr = "<unknown>";
323           try {
324             posAfterStr = this.inputStream.getPos() + "";
325           } catch (Throwable t) {
326             LOG.trace("Error getting pos for error message - ignoring", t);
327           }
328           String message = " while reading " + expectedCells + " WAL KVs; started reading at "
329               + posBefore + " and read up to " + posAfterStr;
330           IOException realEofEx = extractHiddenEof(ex);
331           throw (EOFException) new EOFException("EOF " + message).
332               initCause(realEofEx != null ? realEofEx : ex);
333         }
334         if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
335           LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
336               + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
337               + this.walEditsStopOffset);
338           throw new EOFException("Read WALTrailer while reading WALEdits");
339         }
340       } catch (EOFException eof) {
341         LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
342         // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
343         if (originalPosition < 0) throw eof;
344         // Else restore our position to original location in hope that next time through we will
345         // read successfully.
346         seekOnFs(originalPosition);
347         return false;
348       }
349       return true;
350     }
351   }
352 
353   private IOException extractHiddenEof(Exception ex) {
354     // There are two problems we are dealing with here. Hadoop stream throws generic exception
355     // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
356     IOException ioEx = null;
357     if (ex instanceof EOFException) {
358       return (EOFException)ex;
359     } else if (ex instanceof IOException) {
360       ioEx = (IOException)ex;
361     } else if (ex instanceof RuntimeException
362         && ex.getCause() != null && ex.getCause() instanceof IOException) {
363       ioEx = (IOException)ex.getCause();
364     }
365     if (ioEx != null) {
366       if (ioEx.getMessage().contains("EOF")) return ioEx;
367       return null;
368     }
369     return null;
370   }
371 
372   @Override
373   public WALTrailer getWALTrailer() {
374     return trailer;
375   }
376 
377   @Override
378   protected void seekOnFs(long pos) throws IOException {
379     this.inputStream.seek(pos);
380   }
381 }