1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FSDataInputStream;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.io.util.LRUDictionary;
34 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
35 import org.apache.hadoop.hbase.util.FSUtils;
36
37 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
38 public abstract class ReaderBase implements HLog.Reader {
39 private static final Log LOG = LogFactory.getLog(ReaderBase.class);
40 protected Configuration conf;
41 protected FileSystem fs;
42 protected Path path;
43 protected long edit = 0;
44 protected long fileLength;
45 protected WALTrailer trailer;
46
47
48 protected int trailerWarnSize;
49
50
51
52 protected CompressionContext compressionContext = null;
53 protected boolean emptyCompressionContext = true;
54
55
56
57
58 public ReaderBase() {
59 }
60
61 @Override
62 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
63 throws IOException {
64 this.conf = conf;
65 this.path = path;
66 this.fs = fs;
67 this.fileLength = this.fs.getFileStatus(path).getLen();
68 this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
69 HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
70 String cellCodecClsName = initReader(stream);
71
72 boolean compression = hasCompression();
73 if (compression) {
74
75 try {
76 if (compressionContext == null) {
77 compressionContext = new CompressionContext(LRUDictionary.class,
78 FSUtils.isRecoveredEdits(path), hasTagCompression());
79 } else {
80 compressionContext.clear();
81 }
82 } catch (Exception e) {
83 throw new IOException("Failed to initialize CompressionContext", e);
84 }
85 }
86 initAfterCompression(cellCodecClsName);
87 }
88
89 @Override
90 public HLog.Entry next() throws IOException {
91 return next(null);
92 }
93
94 @Override
95 public HLog.Entry next(HLog.Entry reuse) throws IOException {
96 HLog.Entry e = reuse;
97 if (e == null) {
98 e = new HLog.Entry(new HLogKey(), new WALEdit());
99 }
100 if (compressionContext != null) {
101 e.setCompressionContext(compressionContext);
102 }
103
104 boolean hasEntry = false;
105 try {
106 hasEntry = readNext(e);
107 } catch (IllegalArgumentException iae) {
108 TableName tableName = e.getKey().getTablename();
109 if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
110
111 LOG.info("Got an old ROOT edit, ignoring ");
112 return next(e);
113 }
114 else throw iae;
115 }
116 edit++;
117 if (compressionContext != null && emptyCompressionContext) {
118 emptyCompressionContext = false;
119 }
120 return hasEntry ? e : null;
121 }
122
123 @Override
124 public void seek(long pos) throws IOException {
125 if (compressionContext != null && emptyCompressionContext) {
126 while (next() != null) {
127 if (getPosition() == pos) {
128 emptyCompressionContext = false;
129 break;
130 }
131 }
132 }
133 seekOnFs(pos);
134 }
135
136
137
138
139
140
141 protected abstract String initReader(FSDataInputStream stream) throws IOException;
142
143
144
145
146 protected abstract void initAfterCompression() throws IOException;
147
148
149
150
151
152 protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
153
154
155
156 protected abstract boolean hasCompression();
157
158
159
160
161 protected abstract boolean hasTagCompression();
162
163
164
165
166
167
168 protected abstract boolean readNext(HLog.Entry e) throws IOException;
169
170
171
172
173 protected abstract void seekOnFs(long pos) throws IOException;
174
175 @Override
176 public WALTrailer getWALTrailer() {
177 return null;
178 }
179 }