1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver.wal;
22
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.io.InterruptedIOException;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FSDataInputStream;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
37 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
38 import org.apache.hadoop.hbase.util.CancelableProgressable;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40
41 @InterfaceAudience.Private
42 public class HLogFactory {
43 private static final Log LOG = LogFactory.getLog(HLogFactory.class);
44
45 public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
46 final Configuration conf) throws IOException {
47 return new FSHLog(fs, root, logName, conf);
48 }
49
50 public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
51 final String oldLogName, final Configuration conf) throws IOException {
52 return new FSHLog(fs, root, logName, oldLogName, conf);
53 }
54
55 public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
56 final Configuration conf, final List<WALActionsListener> listeners,
57 final String prefix) throws IOException {
58 return new FSHLog(fs, root, logName, conf, listeners, prefix);
59 }
60
61 public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
62 final Configuration conf, final List<WALActionsListener> listeners,
63 final String prefix) throws IOException {
64 return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
65 conf, listeners, false, prefix, true);
66 }
67
68
69
70
71 private static Class<? extends Reader> logReaderClass;
72
73 static void resetLogReaderClass() {
74 logReaderClass = null;
75 }
76
77 public static HLog.Reader createReader(final FileSystem fs,
78 final Path path, Configuration conf) throws IOException {
79 return createReader(fs, path, conf, null);
80 }
81
82
83
84
85
86
87
88
89 public static HLog.Reader createReader(final FileSystem fs, final Path path,
90 Configuration conf, CancelableProgressable reporter) throws IOException {
91 return createReader(fs, path, conf, reporter, true);
92 }
93
94 public static HLog.Reader createReader(final FileSystem fs, final Path path,
95 Configuration conf, CancelableProgressable reporter, boolean allowCustom)
96 throws IOException {
97 if (allowCustom && (logReaderClass == null)) {
98 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
99 ProtobufLogReader.class, Reader.class);
100 }
101 Class<? extends Reader> lrClass = allowCustom ? logReaderClass : ProtobufLogReader.class;
102
103 try {
104
105
106
107 long startWaiting = EnvironmentEdgeManager.currentTimeMillis();
108 long openTimeout = conf.getInt("hbase.hlog.open.timeout", 300000) + startWaiting;
109 int nbAttempt = 0;
110 while (true) {
111 try {
112 if (lrClass != ProtobufLogReader.class) {
113
114 HLog.Reader reader = lrClass.newInstance();
115 reader.init(fs, path, conf, null);
116 return reader;
117 } else {
118 FSDataInputStream stream = fs.open(path);
119
120
121
122
123 byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
124 boolean isPbWal = (stream.read(magic) == magic.length)
125 && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
126 HLog.Reader reader =
127 isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
128 reader.init(fs, path, conf, stream);
129 return reader;
130 }
131 } catch (IOException e) {
132 String msg = e.getMessage();
133 if (msg != null && (msg.contains("Cannot obtain block length")
134 || msg.contains("Could not obtain the last block")
135 || msg.matches("Blocklist for [^ ]* has changed.*"))) {
136 if (++nbAttempt == 1) {
137 LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
138 }
139 if (reporter != null && !reporter.progress()) {
140 throw new InterruptedIOException("Operation is cancelled");
141 }
142 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
143 LOG.error("Can't open after " + nbAttempt + " attempts and "
144 + (EnvironmentEdgeManager.currentTimeMillis() - startWaiting)
145 + "ms " + " for " + path);
146 } else {
147 try {
148 Thread.sleep(nbAttempt < 3 ? 500 : 1000);
149 continue;
150 } catch (InterruptedException ie) {
151 InterruptedIOException iioe = new InterruptedIOException();
152 iioe.initCause(ie);
153 throw iioe;
154 }
155 }
156 }
157 throw e;
158 }
159 }
160 } catch (IOException ie) {
161 throw ie;
162 } catch (Exception e) {
163 throw new IOException("Cannot get log reader", e);
164 }
165 }
166
167
168
169
170 private static Class<? extends Writer> logWriterClass;
171
172 static void resetLogWriterClass() {
173 logWriterClass = null;
174 }
175
176
177
178
179
180
181 public static HLog.Writer createWALWriter(final FileSystem fs,
182 final Path path, Configuration conf) throws IOException {
183 return createWriter(fs, path, conf, false);
184 }
185
186 public static HLog.Writer createRecoveredEditsWriter(final FileSystem fs,
187 final Path path, Configuration conf) throws IOException {
188 return createWriter(fs, path, conf, true);
189 }
190
191 private static HLog.Writer createWriter(final FileSystem fs,
192 final Path path, Configuration conf, boolean overwritable)
193 throws IOException {
194 try {
195 if (logWriterClass == null) {
196 logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
197 ProtobufLogWriter.class, Writer.class);
198 }
199 HLog.Writer writer = (HLog.Writer)logWriterClass.newInstance();
200 writer.init(fs, path, conf, overwritable);
201 return writer;
202 } catch (Exception e) {
203 throw new IOException("cannot get log writer", e);
204 }
205 }
206 }