001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import java.util.HashMap;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.locks.ReentrantReadWriteLock;
023import java.util.concurrent.locks.Lock;
024
025/**
026 * In memory resource locking that provides READ/WRITE lock capabilities.
027 */
028public class MemoryLocks {
029    final private HashMap<String, ReentrantReadWriteLock> locks = new HashMap<String, ReentrantReadWriteLock>();
030
031    private static enum Type {
032        READ, WRITE
033    }
034
035    /**
036     * Lock token returned when obtaining a lock, the token must be released when the lock is not needed anymore.
037     */
038    public class LockToken {
039        private final ReentrantReadWriteLock rwLock;
040        private final java.util.concurrent.locks.Lock lock;
041        private final String resource;
042
043        private LockToken(ReentrantReadWriteLock rwLock, java.util.concurrent.locks.Lock lock, String resource) {
044            this.rwLock = rwLock;
045            this.lock = lock;
046            this.resource = resource;
047        }
048
049        /**
050         * Release the lock.
051         */
052        public void release() {
053            int val = rwLock.getQueueLength();
054            if (val == 0) {
055                synchronized (locks) {
056                    locks.remove(resource);
057                }
058            }
059            lock.unlock();
060        }
061    }
062
063    /**
064     * Return the number of active locks.
065     *
066     * @return the number of active locks.
067     */
068    public int size() {
069        return locks.size();
070    }
071
072    /**
073     * Obtain a READ lock for a source.
074     *
075     * @param resource resource name.
076     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
077     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
078     * @throws InterruptedException thrown if the thread was interrupted while waiting.
079     */
080    public LockToken getReadLock(String resource, long wait) throws InterruptedException {
081        return getLock(resource, Type.READ, wait);
082    }
083
084    /**
085     * Obtain a WRITE lock for a source.
086     *
087     * @param resource resource name.
088     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
089     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
090     * @throws InterruptedException thrown if the thread was interrupted while waiting.
091     */
092    public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
093        return getLock(resource, Type.WRITE, wait);
094    }
095
096    private LockToken getLock(String resource, Type type, long wait) throws InterruptedException {
097        ReentrantReadWriteLock lockEntry;
098        synchronized (locks) {
099            if (locks.containsKey(resource)) {
100                lockEntry = locks.get(resource);
101            }
102            else {
103                lockEntry = new ReentrantReadWriteLock(true);
104                locks.put(resource, lockEntry);
105            }
106        }
107
108        Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
109
110        if (wait == -1) {
111            lock.lock();
112        }
113        else {
114            if (wait > 0) {
115                if (!lock.tryLock(wait, TimeUnit.MILLISECONDS)) {
116                    return null;
117                }
118            }
119            else {
120                if (!lock.tryLock()) {
121                    return null;
122                }
123            }
124        }
125        synchronized (locks) {
126            if (!locks.containsKey(resource)) {
127                locks.put(resource, lockEntry);
128            }
129        }
130        return new LockToken(lockEntry, lock, resource);
131    }
132
133}