1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.InterProcessLock;
32 import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
33 import org.apache.hadoop.hbase.InterProcessReadWriteLock;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42 import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
43 import org.apache.zookeeper.KeeperException;
44
45 import com.google.protobuf.InvalidProtocolBufferException;
46
47
48
49
50 @InterfaceAudience.Private
51 public abstract class TableLockManager {
52
53 private static final Log LOG = LogFactory.getLog(TableLockManager.class);
54
55
56 public static final String TABLE_LOCK_ENABLE =
57 "hbase.table.lock.enable";
58
59
60 private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
61
62
63 protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
64 "hbase.table.write.lock.timeout.ms";
65
66
67 protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
68 "hbase.table.read.lock.timeout.ms";
69
70 protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
71 600 * 1000;
72
73 protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
74 600 * 1000;
75
76 public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms";
77
78 public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS =
79 600 * 1000;
80
81
82
83
84 @InterfaceAudience.Private
85 public interface TableLock {
86
87
88
89
90
91
92 void acquire() throws IOException;
93
94
95
96
97
98 void release() throws IOException;
99 }
100
101
102
103
104
105
106
107 public abstract TableLock writeLock(TableName tableName, String purpose);
108
109
110
111
112
113
114
115 public abstract TableLock readLock(TableName tableName, String purpose);
116
117
118
119
120
121
122
123 public abstract void visitAllLocks(MetadataHandler handler) throws IOException;
124
125
126
127
128
129
130
131
132 public abstract void reapAllExpiredLocks() throws IOException;
133
134
135
136
137
138
139
140
141
142
143 public abstract void reapWriteLocks() throws IOException;
144
145
146
147
148
149
150
151 public abstract void tableDeleted(TableName tableName)
152 throws IOException;
153
154
155
156
157 public static TableLockManager createTableLockManager(Configuration conf,
158 ZooKeeperWatcher zkWatcher, ServerName serverName) {
159
160 if (conf.getBoolean(TABLE_LOCK_ENABLE,
161 DEFAULT_TABLE_LOCK_ENABLE)) {
162 long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS,
163 DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
164 long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS,
165 DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
166 long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT,
167 DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS);
168
169 return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs);
170 }
171
172 return new NullTableLockManager();
173 }
174
175
176
177
178 @InterfaceAudience.Private
179 public static class NullTableLockManager extends TableLockManager {
180 static class NullTableLock implements TableLock {
181 @Override
182 public void acquire() throws IOException {
183 }
184 @Override
185 public void release() throws IOException {
186 }
187 }
188 @Override
189 public TableLock writeLock(TableName tableName, String purpose) {
190 return new NullTableLock();
191 }
192 @Override
193 public TableLock readLock(TableName tableName, String purpose) {
194 return new NullTableLock();
195 }
196 @Override
197 public void reapAllExpiredLocks() throws IOException {
198 }
199 @Override
200 public void reapWriteLocks() throws IOException {
201 }
202 @Override
203 public void tableDeleted(TableName tableName) throws IOException {
204 }
205 @Override
206 public void visitAllLocks(MetadataHandler handler) throws IOException {
207 }
208 }
209
210
211 public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
212 int pblen = ProtobufUtil.lengthOfPBMagic();
213 if (bytes == null || bytes.length < pblen) {
214 return null;
215 }
216 try {
217 ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom(
218 bytes, pblen, bytes.length - pblen).build();
219 return data;
220 } catch (InvalidProtocolBufferException ex) {
221 LOG.warn("Exception in deserialization", ex);
222 }
223 return null;
224 }
225
226
227
228
229 @InterfaceAudience.Private
230 private static class ZKTableLockManager extends TableLockManager {
231
232 private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() {
233 @Override
234 public void handleMetadata(byte[] ownerMetadata) {
235 if (!LOG.isDebugEnabled()) {
236 return;
237 }
238 ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata);
239 if (data == null) {
240 return;
241 }
242 LOG.debug("Table is locked by " +
243 String.format("[tableName=%s:%s, lockOwner=%s, threadId=%s, " +
244 "purpose=%s, isShared=%s, createTime=%s]",
245 data.getTableName().getNamespace().toStringUtf8(),
246 data.getTableName().getQualifier().toStringUtf8(),
247 ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
248 data.getPurpose(), data.getIsShared(), data.getCreateTime()));
249 }
250 };
251
252 private static class TableLockImpl implements TableLock {
253 long lockTimeoutMs;
254 TableName tableName;
255 InterProcessLock lock;
256 boolean isShared;
257 ZooKeeperWatcher zkWatcher;
258 ServerName serverName;
259 String purpose;
260
261 public TableLockImpl(TableName tableName, ZooKeeperWatcher zkWatcher,
262 ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) {
263 this.tableName = tableName;
264 this.zkWatcher = zkWatcher;
265 this.serverName = serverName;
266 this.lockTimeoutMs = lockTimeoutMs;
267 this.isShared = isShared;
268 this.purpose = purpose;
269 }
270
271 @Override
272 public void acquire() throws IOException {
273 if (LOG.isTraceEnabled()) {
274 LOG.trace("Attempt to acquire table " + (isShared ? "read" : "write") +
275 " lock on: " + tableName + " for:" + purpose);
276 }
277
278 lock = createTableLock();
279 try {
280 if (lockTimeoutMs == -1) {
281
282 lock.acquire();
283 } else {
284 if (!lock.tryAcquire(lockTimeoutMs)) {
285 throw new LockTimeoutException("Timed out acquiring " +
286 (isShared ? "read" : "write") + "lock for table:" + tableName +
287 "for:" + purpose + " after " + lockTimeoutMs + " ms.");
288 }
289 }
290 } catch (InterruptedException e) {
291 LOG.warn("Interrupted acquiring a lock for " + tableName, e);
292 Thread.currentThread().interrupt();
293 throw new InterruptedIOException("Interrupted acquiring a lock");
294 }
295 if (LOG.isTraceEnabled()) LOG.trace("Acquired table " + (isShared ? "read" : "write")
296 + " lock on " + tableName + " for " + purpose);
297 }
298
299 @Override
300 public void release() throws IOException {
301 if (LOG.isTraceEnabled()) {
302 LOG.trace("Attempt to release table " + (isShared ? "read" : "write")
303 + " lock on " + tableName);
304 }
305 if (lock == null) {
306 throw new IllegalStateException("Table " + tableName +
307 " is not locked!");
308 }
309
310 try {
311 lock.release();
312 } catch (InterruptedException e) {
313 LOG.warn("Interrupted while releasing a lock for " + tableName);
314 throw new InterruptedIOException();
315 }
316 if (LOG.isTraceEnabled()) {
317 LOG.trace("Released table lock on " + tableName);
318 }
319 }
320
321 private InterProcessLock createTableLock() {
322 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode,
323 tableName.getNameAsString());
324
325 ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder()
326 .setTableName(ProtobufUtil.toProtoTableName(tableName))
327 .setLockOwner(ProtobufUtil.toServerName(serverName))
328 .setThreadId(Thread.currentThread().getId())
329 .setPurpose(purpose)
330 .setIsShared(isShared)
331 .setCreateTime(EnvironmentEdgeManager.currentTimeMillis()).build();
332 byte[] lockMetadata = toBytes(data);
333
334 InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
335 METADATA_HANDLER);
336 return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata);
337 }
338 }
339
340 private static byte[] toBytes(ZooKeeperProtos.TableLock data) {
341 return ProtobufUtil.prependPBMagic(data.toByteArray());
342 }
343
344 private final ServerName serverName;
345 private final ZooKeeperWatcher zkWatcher;
346 private final long writeLockTimeoutMs;
347 private final long readLockTimeoutMs;
348 private final long lockExpireTimeoutMs;
349
350
351
352
353
354
355
356
357
358
359
360 public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
361 ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) {
362 this.zkWatcher = zkWatcher;
363 this.serverName = serverName;
364 this.writeLockTimeoutMs = writeLockTimeoutMs;
365 this.readLockTimeoutMs = readLockTimeoutMs;
366 this.lockExpireTimeoutMs = lockExpireTimeoutMs;
367 }
368
369 @Override
370 public TableLock writeLock(TableName tableName, String purpose) {
371 return new TableLockImpl(tableName, zkWatcher,
372 serverName, writeLockTimeoutMs, false, purpose);
373 }
374
375 public TableLock readLock(TableName tableName, String purpose) {
376 return new TableLockImpl(tableName, zkWatcher,
377 serverName, readLockTimeoutMs, true, purpose);
378 }
379
380 public void visitAllLocks(MetadataHandler handler) throws IOException {
381 for (String tableName : getTableNames()) {
382 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
383 ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
384 zkWatcher, tableLockZNode, null);
385 lock.readLock(null).visitLocks(handler);
386 lock.writeLock(null).visitLocks(handler);
387 }
388 }
389
390 private List<String> getTableNames() throws IOException {
391
392 List<String> tableNames;
393 try {
394 tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
395 } catch (KeeperException e) {
396 LOG.error("Unexpected ZooKeeper error when listing children", e);
397 throw new IOException("Unexpected ZooKeeper exception", e);
398 }
399 return tableNames;
400 }
401
402 @Override
403 public void reapWriteLocks() throws IOException {
404
405 try {
406 for (String tableName : getTableNames()) {
407 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
408 ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
409 zkWatcher, tableLockZNode, null);
410 lock.writeLock(null).reapAllLocks();
411 }
412 } catch (IOException ex) {
413 throw ex;
414 } catch (Exception ex) {
415 LOG.warn("Caught exception while reaping table write locks", ex);
416 }
417 }
418
419 @Override
420 public void reapAllExpiredLocks() throws IOException {
421
422 try {
423 for (String tableName : getTableNames()) {
424 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
425 ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
426 zkWatcher, tableLockZNode, null);
427 lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs);
428 lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs);
429 }
430 } catch (IOException ex) {
431 throw ex;
432 } catch (Exception ex) {
433 throw new IOException(ex);
434 }
435 }
436
437 @Override
438 public void tableDeleted(TableName tableName) throws IOException {
439
440 String tableNameStr = tableName.getNameAsString();
441 String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
442 try {
443 ZKUtil.deleteNode(zkWatcher, tableLockZNode);
444 } catch (KeeperException ex) {
445 if (ex.code() == KeeperException.Code.NOTEMPTY) {
446
447
448 LOG.warn("Could not delete the znode for table locks because NOTEMPTY: "
449 + tableLockZNode);
450 return;
451 }
452 throw new IOException(ex);
453 }
454 }
455 }
456 }