001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.Collection;
021import java.util.Map;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.util.ReflectionUtils;
025import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
026import org.apache.oozie.dependency.hcat.HCatDependencyCache;
027import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
028import org.apache.oozie.util.HCatURI;
029import org.apache.oozie.util.XLog;
030
031/**
032 * Module that functions like a caching service to maintain partition dependency mappings
033 */
034public class PartitionDependencyManagerService implements Service {
035
036    public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService.";
037    public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl";
038
039    private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
040
041    private HCatDependencyCache dependencyCache;
042
043    @Override
044    public void init(Services services) throws ServiceException {
045        init(services.getConf());
046    }
047
048    private void init(Configuration conf) throws ServiceException {
049        Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null);
050        dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache()
051                : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null);
052        dependencyCache.init(conf);
053        LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass()
054                .getName());
055    }
056
057    @Override
058    public void destroy() {
059        dependencyCache.destroy();
060    }
061
062    @Override
063    public Class<? extends Service> getInterface() {
064        return PartitionDependencyManagerService.class;
065    }
066
067    /**
068     * Add a missing partition dependency and the actionID waiting on it
069     *
070     * @param hcatURI dependency URI
071     * @param actionID ID of action which is waiting for the dependency
072     */
073    public void addMissingDependency(HCatURI hcatURI, String actionID) {
074        dependencyCache.addMissingDependency(hcatURI, actionID);
075    }
076
077    /**
078     * Remove a missing partition dependency associated with a actionID
079     *
080     * @param hcatURI dependency URI
081     * @param actionID ID of action which is waiting for the dependency
082     * @return true if successful, else false
083     */
084    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
085        return dependencyCache.removeMissingDependency(hcatURI, actionID);
086    }
087
088    /**
089     * Get the list of actionIDs waiting for a partition
090     *
091     * @param hcatURI dependency URI
092     * @return list of actionIDs
093     */
094    public Collection<String> getWaitingActions(HCatURI hcatURI) {
095        return dependencyCache.getWaitingActions(hcatURI);
096    }
097
098    /**
099     * Mark a partition dependency as available
100     *
101     * @param server host:port of the server
102     * @param db name of the database
103     * @param table name of the table
104     * @param partitions list of available partitions
105     * @return list of actionIDs for which the dependency is now available
106     */
107    public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) {
108        Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table,
109                partitions);
110        if (actionsWithAvailableDep != null) {
111            for (String actionID : actionsWithAvailableDep) {
112                boolean ret = Services.get().get(CallableQueueService.class)
113                        .queue(new CoordActionUpdatePushMissingDependency(actionID), 100);
114                if (ret == false) {
115                    XLog.getLog(getClass()).warn(
116                            "Unable to queue the callable commands for PartitionDependencyManagerService for actionID "
117                                    + actionID + ".Most possibly command queue is full. Queue size is :"
118                                    + Services.get().get(CallableQueueService.class).queueSize());
119                }
120            }
121        }
122    }
123
124    /**
125     * Get a list of available dependency URIs for a actionID
126     *
127     * @param actionID action id
128     * @return list of available dependency URIs
129     */
130    public Collection<String> getAvailableDependencyURIs(String actionID) {
131        return dependencyCache.getAvailableDependencyURIs(actionID);
132    }
133
134    /**
135     * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed.
136     *
137     * @param actionID action id
138     * @param dependencyURIs set of dependency URIs
139     * @return true if successful, else false
140     */
141    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
142        return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
143    }
144
145}