001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.Properties;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.AppType;
029import org.apache.oozie.client.event.SLAEvent;
030import org.apache.oozie.event.WorkflowJobEvent;
031import org.apache.oozie.executor.jpa.BundleJobGetForUserJPAExecutor;
032import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserJPAExecutor;
033import org.apache.oozie.executor.jpa.JPAExecutorException;
034import org.apache.oozie.executor.jpa.WorkflowJobGetForUserJPAExecutor;
035import org.apache.oozie.util.XLog;
036
037
038/**
039 * JMS Topic service to retrieve topic names from events or job id
040 *
041 */
042public class JMSTopicService implements Service {
043
044    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSTopicService.";
045    public static final String TOPIC_NAME = CONF_PREFIX + "topic.name";
046    public static final String TOPIC_PREFIX = CONF_PREFIX + "topic.prefix";
047    private static XLog LOG;
048    private Configuration conf;
049    private final Map<String, String> topicMap = new HashMap<String, String>();
050    private static final List<String> JOB_TYPE_CONSTANTS = new ArrayList<String>();
051    private static final List<String> ALLOWED_TOPIC_NAMES = new ArrayList<String>();
052    private JPAService jpaService = Services.get().get(JPAService.class);
053    private String defaultTopicName = TopicType.USER.value;
054    private String topicPrefix;
055
056    static {
057        ALLOWED_TOPIC_NAMES.add(TopicType.USER.value);
058        ALLOWED_TOPIC_NAMES.add(TopicType.JOBID.value);
059    }
060
061    static {
062        JOB_TYPE_CONSTANTS.add(JobType.WORKFLOW.value);
063        JOB_TYPE_CONSTANTS.add(JobType.COORDINATOR.value);
064        JOB_TYPE_CONSTANTS.add(JobType.BUNDLE.value);
065    }
066
067    public static enum JobType {
068        WORKFLOW("WORKFLOW"), COORDINATOR("COORDINATOR"), BUNDLE("BUNDLE");
069        private String value;
070
071        JobType(String value) {
072            this.value = value;
073        }
074
075        String getValue() {
076            return value;
077        }
078
079    }
080
081    public static enum TopicType {
082        USER("${username}"), JOBID("${jobId}");
083
084        private String value;
085
086        TopicType(String value) {
087            this.value = value;
088        }
089
090        String getValue() {
091            return value;
092        }
093
094    }
095
096    @Override
097    public void init(Services services) throws ServiceException {
098        LOG = XLog.getLog(getClass());
099        conf = services.getConf();
100        parseTopicConfiguration();
101        topicPrefix = conf.get(TOPIC_PREFIX, "");
102    }
103
104    private void parseTopicConfiguration() throws ServiceException {
105        String topicName = conf.get(TOPIC_NAME, "default=" + TopicType.USER.value);
106        if (topicName == null) {
107            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic cannot be null ");
108        }
109        LOG.info("Topic Name is [{0}]", topicName);
110        String[] topic = topicName.trim().split(",");
111        for (int i = 0; i < topic.length; i++) {
112            String[] split = topic[i].trim().split("=");
113            if (split.length == 2) {
114                split[0] = split[0].trim();
115                split[1] = split[1].trim();
116                if (split[0].equals("default")) {
117                    if (!ALLOWED_TOPIC_NAMES.contains(split[1])) {
118                        throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Topic name " + split[1]
119                                + " not allowed in default; allowed" + "topics are " + ALLOWED_TOPIC_NAMES);
120                    }
121                    defaultTopicName = split[1];
122                }
123                else if (!JOB_TYPE_CONSTANTS.contains(split[0])) {
124                    throw new ServiceException(ErrorCode.E0100, getClass().getName(),
125                            "Incorrect job type for defining JMS topic: " + split[0] + " ;" + "allowed job types are "
126                                    + JOB_TYPE_CONSTANTS);
127                }
128                else if (!ALLOWED_TOPIC_NAMES.contains(split[1]) && split[1].contains("$")) {
129                    throw new ServiceException(ErrorCode.E0100, getClass().getName(), "JMS topic value " + split[1]
130                            + " " + "for a job type is incorrect " + "Correct values are " + ALLOWED_TOPIC_NAMES);
131                }
132                else {
133                    topicMap.put(split[0], split[1]);
134                }
135            }
136            else {
137                throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Property " + topic[i]
138                        + "has incorrect syntax; It should be specified as key value pair");
139            }
140        }
141    }
142
143    /**
144     * Retrieve topic from Job id
145     * @param jobId
146     * @return
147     * @throws JPAExecutorException
148     */
149    public String getTopic(String jobId) throws JPAExecutorException {
150        String topicName = null;
151        if (jobId.contains("-W@")) {
152            jobId = jobId.substring(0, jobId.indexOf('@'));
153            topicName = getTopicForWorkflow(jobId);
154        }
155        else if (jobId.endsWith("-W")) {
156            topicName = getTopicForWorkflow(jobId);
157        }
158        else if (jobId.contains("-C@")) {
159            jobId = jobId.substring(0, jobId.indexOf('@'));
160            topicName = getTopicForCoordinator(jobId);
161        }
162        else if (jobId.endsWith("C")) {
163            topicName = getTopicForCoordinator(jobId);
164        }
165        else if (jobId.contains("-B_")) {
166            jobId = jobId.substring(0, jobId.indexOf('_'));
167            topicName = getTopicForBundle(jobId);
168        }
169
170        else if (jobId.endsWith("B")) {
171            topicName = getTopicForBundle(jobId);
172        }
173        return topicPrefix + topicName;
174    }
175
176    /**
177     * Retrieve Topic
178     *
179     * @param appType
180     * @param user
181     * @param jobId
182     * @param parentJobId
183     * @return topicName
184     */
185
186    public String getTopic(AppType appType, String user, String jobId, String parentJobId) {
187        String topicName = null;
188        String id = jobId;
189        if (appType == AppType.COORDINATOR_JOB || appType == AppType.COORDINATOR_ACTION) {
190            topicName = topicMap.get(JobType.COORDINATOR.value);
191            if (appType == AppType.COORDINATOR_ACTION) {
192                id = parentJobId;
193            }
194        }
195        else if (appType == AppType.WORKFLOW_JOB || appType == AppType.WORKFLOW_ACTION) {
196            topicName = topicMap.get(JobType.WORKFLOW);
197        }
198
199        if (topicName == null) {
200            if (defaultTopicName.equals(TopicType.USER.value)) {
201                topicName = user;
202            }
203            else if (defaultTopicName.equals(TopicType.JOBID.value)) {
204                topicName = id;
205            }
206        }
207        return topicPrefix + topicName;
208    }
209
210    private String getTopicForWorkflow(String jobId) throws JPAExecutorException {
211        String topicName = topicMap.get(JobType.WORKFLOW.value);
212        if (topicName == null) {
213            topicName = defaultTopicName;
214        }
215        if (topicName.equals(TopicType.USER.value)) {
216            topicName = jpaService.execute(new WorkflowJobGetForUserJPAExecutor(jobId));
217        }
218        else if (topicName.equals(TopicType.JOBID.value)) {
219            topicName = jobId;
220        }
221        return topicName;
222
223    }
224
225    private String getTopicForCoordinator(String jobId) throws JPAExecutorException {
226        String topicName = topicMap.get(JobType.COORDINATOR.value);
227
228        if (topicName == null) {
229            topicName = defaultTopicName;
230        }
231        if (topicName.equals(TopicType.USER.value)) {
232            topicName = jpaService.execute(new CoordinatorJobGetForUserJPAExecutor(jobId));
233        }
234        else if (topicName.equals(TopicType.JOBID.value)) {
235            topicName = jobId;
236        }
237        return topicName;
238    }
239
240    private String getTopicForBundle(String jobId) throws JPAExecutorException {
241        String topicName = topicMap.get(JobType.BUNDLE.value);
242        if (topicName == null) {
243            topicName = defaultTopicName;
244        }
245        if (topicName.equals(TopicType.USER.value)) {
246            topicName = jpaService.execute(new BundleJobGetForUserJPAExecutor(jobId));
247        }
248        else if (topicName.equals(TopicType.JOBID.value)) {
249            topicName = jobId;
250        }
251        return topicName;
252    }
253
254    public Properties getTopicPatternProperties() {
255        Properties props = new Properties();
256        String wfTopic = topicMap.get(JobType.WORKFLOW.value);
257        wfTopic = (wfTopic != null) ? wfTopic : defaultTopicName;
258        props.put(AppType.WORKFLOW_JOB, wfTopic);
259        props.put(AppType.WORKFLOW_ACTION, wfTopic);
260
261        String coordTopic = topicMap.get(JobType.COORDINATOR.value);
262        coordTopic = (coordTopic != null) ? coordTopic : defaultTopicName;
263        props.put(AppType.COORDINATOR_JOB, coordTopic);
264        props.put(AppType.COORDINATOR_ACTION, coordTopic);
265
266        String bundleTopic = topicMap.get(JobType.BUNDLE.value);
267        bundleTopic = (bundleTopic != null) ? bundleTopic : defaultTopicName;
268        props.put(AppType.BUNDLE_JOB, bundleTopic);
269        props.put(AppType.BUNDLE_ACTION, bundleTopic);
270
271        return props;
272    }
273
274    public String getTopicPrefix() {
275        return topicPrefix;
276    }
277
278    @Override
279    public void destroy() {
280        topicMap.clear();
281
282    }
283
284    @Override
285    public Class<? extends Service> getInterface() {
286        return JMSTopicService.class;
287    }
288
289}