1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.nio.ByteBuffer;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.List;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FSDataInputStream;
35 import org.apache.hadoop.hbase.codec.Codec;
36 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
37 import org.apache.hadoop.hbase.io.LimitInputStream;
38 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
39 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
40 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
41 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
42 import org.apache.hadoop.hbase.util.Bytes;
43
44 import com.google.protobuf.CodedInputStream;
45 import com.google.protobuf.InvalidProtocolBufferException;
46
47
48
49
50
51
52
53
54
55
56
57
58 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
59 public class ProtobufLogReader extends ReaderBase {
60 private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
61 static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
62 static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
63 protected FSDataInputStream inputStream;
64 protected Codec.Decoder cellDecoder;
65 protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
66 protected boolean hasCompression = false;
67 protected boolean hasTagCompression = false;
68
69
70 private long walEditsStopOffset;
71 private boolean trailerPresent;
72 private static List<String> writerClsNames = new ArrayList<String>();
73 static {
74 writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
75 }
76
77 enum WALHdrResult {
78 EOF,
79 SUCCESS,
80 UNKNOWN_WRITER_CLS
81 }
82
83
84 static class WALHdrContext {
85 WALHdrResult result;
86 String cellCodecClsName;
87
88 WALHdrContext(WALHdrResult result, String cellCodecClsName) {
89 this.result = result;
90 this.cellCodecClsName = cellCodecClsName;
91 }
92 WALHdrResult getResult() {
93 return result;
94 }
95 String getCellCodecClsName() {
96 return cellCodecClsName;
97 }
98 }
99
100 public ProtobufLogReader() {
101 super();
102 }
103
104 @Override
105 public void close() throws IOException {
106 if (this.inputStream != null) {
107 this.inputStream.close();
108 this.inputStream = null;
109 }
110 }
111
112 @Override
113 public long getPosition() throws IOException {
114 return inputStream.getPos();
115 }
116
117 @Override
118 public void reset() throws IOException {
119 String clsName = initInternal(null, false);
120 initAfterCompression(clsName);
121 }
122
123 @Override
124 protected String initReader(FSDataInputStream stream) throws IOException {
125 return initInternal(stream, true);
126 }
127
128
129
130
131 protected List<String> getWriterClsNames() {
132 return writerClsNames;
133 }
134
135 protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream)
136 throws IOException {
137 boolean res = builder.mergeDelimitedFrom(stream);
138 if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
139 if (builder.hasWriterClsName() &&
140 !getWriterClsNames().contains(builder.getWriterClsName())) {
141 return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
142 }
143 String clsName = null;
144 if (builder.hasCellCodecClsName()) {
145 clsName = builder.getCellCodecClsName();
146 }
147 return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
148 }
149
150 private String initInternal(FSDataInputStream stream, boolean isFirst)
151 throws IOException {
152 close();
153 long expectedPos = PB_WAL_MAGIC.length;
154 if (stream == null) {
155 stream = fs.open(path);
156 stream.seek(expectedPos);
157 }
158 if (stream.getPos() != expectedPos) {
159 throw new IOException("The stream is at invalid position: " + stream.getPos());
160 }
161
162 WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
163 WALHdrContext hdrCtxt = readHeader(builder, stream);
164 WALHdrResult walHdrRes = hdrCtxt.getResult();
165 if (walHdrRes == WALHdrResult.EOF) {
166 throw new EOFException("Couldn't read WAL PB header");
167 }
168 if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
169 throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
170 }
171 if (isFirst) {
172 WALProtos.WALHeader header = builder.build();
173 this.hasCompression = header.hasHasCompression() && header.getHasCompression();
174 this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
175 }
176 this.inputStream = stream;
177 this.walEditsStopOffset = this.fileLength;
178 long currentPosition = stream.getPos();
179 trailerPresent = setTrailerIfPresent();
180 this.seekOnFs(currentPosition);
181 if (LOG.isTraceEnabled()) {
182 LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
183 + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
184 }
185 return hdrCtxt.getCellCodecClsName();
186 }
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 private boolean setTrailerIfPresent() {
206 try {
207 long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
208 if (trailerSizeOffset <= 0) return false;
209 this.seekOnFs(trailerSizeOffset);
210
211 int trailerSize = this.inputStream.readInt();
212 ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
213 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
214 if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
215 LOG.trace("No trailer found.");
216 return false;
217 }
218 if (trailerSize < 0) {
219 LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
220 return false;
221 } else if (trailerSize > this.trailerWarnSize) {
222
223 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
224 + trailerSize + " > " + this.trailerWarnSize);
225 }
226
227 long positionOfTrailer = trailerSizeOffset - trailerSize;
228 this.seekOnFs(positionOfTrailer);
229
230 buf = ByteBuffer.allocate(trailerSize);
231 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
232 trailer = WALTrailer.parseFrom(buf.array());
233 this.walEditsStopOffset = positionOfTrailer;
234 return true;
235 } catch (IOException ioe) {
236 LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
237 }
238 return false;
239 }
240
241 protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
242 CompressionContext compressionContext) throws IOException {
243 return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
244 }
245
246 @Override
247 protected void initAfterCompression() throws IOException {
248 initAfterCompression(null);
249 }
250
251 @Override
252 protected void initAfterCompression(String cellCodecClsName) throws IOException {
253 WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
254 this.cellDecoder = codec.getDecoder(this.inputStream);
255 if (this.hasCompression) {
256 this.byteStringUncompressor = codec.getByteStringUncompressor();
257 }
258 }
259
260 @Override
261 protected boolean hasCompression() {
262 return this.hasCompression;
263 }
264
265 @Override
266 protected boolean hasTagCompression() {
267 return this.hasTagCompression;
268 }
269
270 @Override
271 protected boolean readNext(HLog.Entry entry) throws IOException {
272 while (true) {
273
274 long originalPosition = this.inputStream.getPos();
275 if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
276 return false;
277 }
278 WALKey.Builder builder = WALKey.newBuilder();
279 long size = 0;
280 try {
281 long available = -1;
282 try {
283 int firstByte = this.inputStream.read();
284 if (firstByte == -1) {
285 throw new EOFException("First byte is negative");
286 }
287 size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
288
289 available = this.inputStream.available();
290 if (available > 0 && available < size) {
291 throw new EOFException("Available stream not enough for edit, " +
292 "inputStream.available()= " + this.inputStream.available() + ", " +
293 "entry size= " + size);
294 }
295 final InputStream limitedInput = new LimitInputStream(this.inputStream, size);
296 builder.mergeFrom(limitedInput);
297 } catch (InvalidProtocolBufferException ipbe) {
298 throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
299 originalPosition + ", currentPosition=" + this.inputStream.getPos() +
300 ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
301 }
302 if (!builder.isInitialized()) {
303
304
305 throw new EOFException("Partial PB while reading WAL, " +
306 "probably an unexpected EOF, ignoring");
307 }
308 WALKey walKey = builder.build();
309 entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
310 if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
311 LOG.trace("WALKey has no KVs that follow it; trying the next one");
312 continue;
313 }
314 int expectedCells = walKey.getFollowingKvCount();
315 long posBefore = this.inputStream.getPos();
316 try {
317 int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
318 if (expectedCells != actualCells) {
319 throw new EOFException("Only read " + actualCells);
320 }
321 } catch (Exception ex) {
322 String posAfterStr = "<unknown>";
323 try {
324 posAfterStr = this.inputStream.getPos() + "";
325 } catch (Throwable t) {
326 LOG.trace("Error getting pos for error message - ignoring", t);
327 }
328 String message = " while reading " + expectedCells + " WAL KVs; started reading at "
329 + posBefore + " and read up to " + posAfterStr;
330 IOException realEofEx = extractHiddenEof(ex);
331 throw (EOFException) new EOFException("EOF " + message).
332 initCause(realEofEx != null ? realEofEx : ex);
333 }
334 if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
335 LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
336 + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
337 + this.walEditsStopOffset);
338 throw new EOFException("Read WALTrailer while reading WALEdits");
339 }
340 } catch (EOFException eof) {
341 LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
342
343 if (originalPosition < 0) throw eof;
344
345
346 seekOnFs(originalPosition);
347 return false;
348 }
349 return true;
350 }
351 }
352
353 private IOException extractHiddenEof(Exception ex) {
354
355
356 IOException ioEx = null;
357 if (ex instanceof EOFException) {
358 return (EOFException)ex;
359 } else if (ex instanceof IOException) {
360 ioEx = (IOException)ex;
361 } else if (ex instanceof RuntimeException
362 && ex.getCause() != null && ex.getCause() instanceof IOException) {
363 ioEx = (IOException)ex.getCause();
364 }
365 if (ioEx != null) {
366 if (ioEx.getMessage().contains("EOF")) return ioEx;
367 return null;
368 }
369 return null;
370 }
371
372 @Override
373 public WALTrailer getWALTrailer() {
374 return trailer;
375 }
376
377 @Override
378 protected void seekOnFs(long pos) throws IOException {
379 this.inputStream.seek(pos);
380 }
381 }