001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.oozie.dependency.hcat.HCatMessageHandler;
031import org.apache.oozie.jms.JMSConnectionInfo;
032import org.apache.oozie.util.HCatURI;
033import org.apache.oozie.util.MappingRule;
034import org.apache.oozie.util.XLog;
035
036public class HCatAccessorService implements Service {
037
038    public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService.";
039    public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections";
040
041    private static XLog LOG;
042    private static String DELIMITER = "#";
043    private Configuration conf;
044    private JMSAccessorService jmsService;
045    private List<MappingRule> mappingRules;
046    private JMSConnectionInfo defaultJMSConnInfo;
047    /**
048     * Map of publisher(host:port) to JMS connection info
049     */
050    private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap;
051    /**
052     * List of non publishers(host:port)
053     */
054    private Set<String> nonJMSPublishers;
055    /**
056     * Mapping of table to the topic name for the table
057     */
058    private Map<String, String> registeredTopicsMap;
059
060    @Override
061    public void init(Services services) throws ServiceException {
062        LOG = XLog.getLog(getClass());
063        conf = services.getConf();
064        this.jmsService = services.get(JMSAccessorService.class);
065        initializeMappingRules();
066        this.nonJMSPublishers = new HashSet<String>();
067        this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>();
068        this.registeredTopicsMap = new HashMap<String, String>();
069    }
070
071    private void initializeMappingRules() {
072        String[] connections = conf.getStrings(JMS_CONNECTIONS_PROPERTIES);
073        if (connections != null) {
074            mappingRules = new ArrayList<MappingRule>(connections.length);
075            for (String connection : connections) {
076                String[] values = connection.split("=", 2);
077                String key = values[0].trim();
078                String value = values[1].trim();
079                if (key.equals("default")) {
080                    defaultJMSConnInfo = new JMSConnectionInfo(value);
081                }
082                else {
083                    mappingRules.add(new MappingRule(key, value));
084                }
085            }
086        }
087        else {
088            LOG.warn("No JMS connection defined");
089        }
090    }
091
092    /**
093     * Determine whether a given source URI publishes JMS messages
094     *
095     * @param sourceURI URI of the publisher
096     * @return true if we have JMS connection information for the source URI, else false
097     */
098    public boolean isKnownPublisher(URI sourceURI) {
099        if (nonJMSPublishers.contains(sourceURI.getAuthority())) {
100            return true;
101        }
102        else {
103            JMSConnectionInfo connInfo = publisherJMSConnInfoMap.get(sourceURI.getAuthority());
104            return connInfo == null ? (getJMSConnectionInfo(sourceURI) != null) : true;
105        }
106    }
107
108    /**
109     * Given a publisher host:port return the connection details of JMS server that the publisher
110     * publishes to
111     *
112     * @param publisherURI URI of the publisher
113     * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to
114     */
115    public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) {
116        String publisherAuthority = publisherURI.getAuthority();
117        JMSConnectionInfo connInfo = null;
118        if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) {
119            connInfo = publisherJMSConnInfoMap.get(publisherAuthority);
120        }
121        else {
122            String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority;
123            for (MappingRule mr : mappingRules) {
124                String jndiPropertiesString = mr.applyRule(schemeWithAuthority);
125                if (jndiPropertiesString != null) {
126                    connInfo = new JMSConnectionInfo(jndiPropertiesString);
127                    publisherJMSConnInfoMap.put(publisherAuthority, connInfo);
128                    LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
129                    break;
130                }
131            }
132            if (connInfo == null && defaultJMSConnInfo != null) {
133                connInfo = defaultJMSConnInfo;
134                publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo);
135                LOG.info("Adding hcat server [{0}] to the list of JMS publishers", schemeWithAuthority);
136            }
137            else {
138                nonJMSPublishers.add(publisherAuthority);
139                LOG.info("Adding hcat server [{0}] to the list of non JMS publishers", schemeWithAuthority);
140            }
141
142        }
143        return connInfo;
144    }
145
146    /**
147     * Check if we are already listening to the JMS topic for the table in the given hcatURI
148     *
149     * @param hcatURI hcatalog partition URI
150     * @return true if registered to a JMS topic for the table in the given hcatURI
151     */
152    public boolean isRegisteredForNotification(HCatURI hcatURI) {
153        return registeredTopicsMap.containsKey(getKeyForRegisteredTopicsMap(hcatURI));
154    }
155
156    /**
157     * Register for notifications on a JMS topic for the specified hcatalog table.
158     *
159     * @param hcatURI hcatalog partition URI
160     * @param topic JMS topic to register to
161     * @param msgHandler Handler which will process the messages received on the topic
162     */
163    public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) {
164        JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
165        jmsService.registerForNotification(connInfo, topic, msgHandler);
166        registeredTopicsMap.put(
167                getKeyForRegisteredTopicsMap(hcatURI), topic);
168    }
169
170    public void unregisterFromNotification(HCatURI hcatURI) {
171        String topic = registeredTopicsMap.remove(getKeyForRegisteredTopicsMap(hcatURI));
172        if (topic != null) {
173            JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
174            jmsService.unregisterFromNotification(connInfo, topic);
175        }
176    }
177
178    public void unregisterFromNotification(String server, String database, String table) {
179        String key = server + DELIMITER + database + DELIMITER + table;
180        String topic = registeredTopicsMap.remove(key);
181        if (topic != null) {
182            try {
183                JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server));
184                jmsService.unregisterFromNotification(connInfo, topic);
185            }
186            catch (URISyntaxException e) {
187                LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e);
188            }
189        }
190    }
191
192    private String getKeyForRegisteredTopicsMap(HCatURI hcatURI) {
193        return hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
194                + DELIMITER + hcatURI.getTable();
195    }
196
197    @Override
198    public void destroy() {
199        publisherJMSConnInfoMap.clear();
200    }
201
202    @Override
203    public Class<? extends Service> getInterface() {
204        return HCatAccessorService.class;
205    }
206
207}