001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.sla.listener;
020
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Properties;
025import java.util.Set;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import javax.mail.Address;
030import javax.mail.Message;
031import javax.mail.MessagingException;
032import javax.mail.NoSuchProviderException;
033import javax.mail.SendFailedException;
034import javax.mail.Session;
035import javax.mail.Transport;
036import javax.mail.internet.AddressException;
037import javax.mail.internet.InternetAddress;
038import javax.mail.internet.MimeMessage;
039import javax.mail.internet.MimeMessage.RecipientType;
040
041import org.apache.hadoop.conf.Configuration;
042import org.apache.oozie.action.email.EmailActionExecutor;
043import org.apache.oozie.action.email.EmailActionExecutor.JavaMailAuthenticator;
044import org.apache.oozie.client.event.SLAEvent;
045import org.apache.oozie.sla.listener.SLAEventListener;
046import org.apache.oozie.sla.service.SLAService;
047import org.apache.oozie.util.XLog;
048
049import com.google.common.annotations.VisibleForTesting;
050import com.google.common.cache.CacheBuilder;
051import com.google.common.cache.CacheLoader;
052import com.google.common.cache.LoadingCache;
053
054public class SLAEmailEventListener extends SLAEventListener {
055
056    public static final String SMTP_CONNECTION_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.connectiontimeout";
057    public static final String SMTP_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "smtp.timeout";
058    public static final String BLACKLIST_CACHE_TIMEOUT = EmailActionExecutor.CONF_PREFIX + "blacklist.cachetimeout";
059    public static final String BLACKLIST_FAIL_COUNT = EmailActionExecutor.CONF_PREFIX + "blacklist.failcount";
060    public static final String OOZIE_BASE_URL = "oozie.base.url";
061    private Session session;
062    private String oozieBaseUrl;
063    private InternetAddress fromAddr;
064    private String ADDRESS_SEPARATOR = ",";
065    private LoadingCache<String, AtomicInteger> blackList;
066    private int blacklistFailCount;
067    private final String BLACKLIST_CACHE_TIMEOUT_DEFAULT = "1800"; // in sec. default to 30 min
068    private final String BLACKLIST_FAIL_COUNT_DEFAULT = "2"; // stop sending when fail count equals or exceeds
069    private final String SMTP_HOST_DEFAULT = "localhost";
070    private final String SMTP_PORT_DEFAULT = "25";
071    private final boolean SMTP_AUTH_DEFAULT = false;
072    private final String SMTP_SOURCE_DEFAULT = "oozie@localhost";
073    private final String SMTP_CONNECTION_TIMEOUT_DEFAULT = "5000";
074    private final String SMTP_TIMEOUT_DEFAULT = "5000";
075    private static XLog LOG = XLog.getLog(SLAEmailEventListener.class);
076    private Set<SLAEvent.EventStatus> alertEvents;
077    public static String EMAIL_BODY_FIELD_SEPARATER = " - ";
078    public static String EMAIL_BODY_FIELD_INDENT = "  ";
079    public static String EMAIL_BODY_HEADER_SEPARATER = ":";
080
081    public enum EmailField {
082        EVENT_STATUS("SLA Status"), APP_TYPE("App Type"), APP_NAME("App Name"), USER("User"), JOBID("Job ID"), PARENT_JOBID(
083                "Parent Job ID"), JOB_URL("Job URL"), PARENT_JOB_URL("Parent Job URL"), NOMINAL_TIME("Nominal Time"),
084                EXPECTED_START_TIME("Expected Start Time"), ACTUAL_START_TIME("Actual Start Time"),
085                EXPECTED_END_TIME("Expected End Time"), ACTUAL_END_TIME("Actual End Time"), EXPECTED_DURATION("Expected Duration (in mins)"),
086                ACTUAL_DURATION("Actual Duration (in mins)"), NOTIFICATION_MESSAGE("Notification Message"), UPSTREAM_APPS("Upstream Apps"),
087                JOB_STATUS("Job Status");
088        private String name;
089
090        private EmailField(String name) {
091            this.name = name;
092        }
093
094        public String toString() {
095            return name;
096        }
097    };
098
099    @Override
100    public void init(Configuration conf) throws Exception {
101
102        oozieBaseUrl = conf.get(OOZIE_BASE_URL);
103        // Get SMTP properties from the configuration used in Email Action
104        String smtpHost = conf.get(EmailActionExecutor.EMAIL_SMTP_HOST, SMTP_HOST_DEFAULT);
105        String smtpPort = conf.get(EmailActionExecutor.EMAIL_SMTP_PORT, SMTP_PORT_DEFAULT);
106        Boolean smtpAuth = conf.getBoolean(EmailActionExecutor.EMAIL_SMTP_AUTH, SMTP_AUTH_DEFAULT);
107        String smtpUser = conf.get(EmailActionExecutor.EMAIL_SMTP_USER, "");
108        String smtpPassword = conf.get(EmailActionExecutor.EMAIL_SMTP_PASS, "");
109        String smtpConnectTimeout = conf.get(SMTP_CONNECTION_TIMEOUT, SMTP_CONNECTION_TIMEOUT_DEFAULT);
110        String smtpTimeout = conf.get(SMTP_TIMEOUT, SMTP_TIMEOUT_DEFAULT);
111
112        int blacklistTimeOut = Integer.valueOf(conf.get(BLACKLIST_CACHE_TIMEOUT, BLACKLIST_CACHE_TIMEOUT_DEFAULT));
113        blacklistFailCount = Integer.valueOf(conf.get(BLACKLIST_FAIL_COUNT, BLACKLIST_FAIL_COUNT_DEFAULT));
114
115        // blacklist email addresses causing SendFailedException with cache timeout
116        blackList = CacheBuilder.newBuilder()
117                .expireAfterWrite(blacklistTimeOut, TimeUnit.SECONDS)
118                .build(new CacheLoader<String, AtomicInteger>() {
119                    public AtomicInteger load(String key) throws Exception {
120                        return new AtomicInteger();
121                    }
122                });
123
124        // Set SMTP properties
125        Properties properties = new Properties();
126        properties.setProperty("mail.smtp.host", smtpHost);
127        properties.setProperty("mail.smtp.port", smtpPort);
128        properties.setProperty("mail.smtp.auth", smtpAuth.toString());
129        properties.setProperty("mail.smtp.connectiontimeout", smtpConnectTimeout);
130        properties.setProperty("mail.smtp.timeout", smtpTimeout);
131
132        try {
133            fromAddr = new InternetAddress(conf.get("oozie.email.from.address", SMTP_SOURCE_DEFAULT));
134        }
135        catch (AddressException ae) {
136            LOG.error("Bad Source Address specified in oozie.email.from.address", ae);
137            throw ae;
138        }
139
140        if (!smtpAuth) {
141            session = Session.getInstance(properties);
142        }
143        else {
144            session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword));
145        }
146
147        alertEvents = new HashSet<SLAEvent.EventStatus>();
148        String alertEventsStr = conf.get(SLAService.CONF_ALERT_EVENTS);
149        if (alertEventsStr != null) {
150            String[] alertEvt = alertEventsStr.split(",", -1);
151            for (String evt : alertEvt) {
152                alertEvents.add(SLAEvent.EventStatus.valueOf(evt));
153            }
154        }
155    }
156
157    @Override
158    public void destroy() {
159    }
160
161    private void sendSLAEmail(SLAEvent event) throws Exception {
162        Message message = new MimeMessage(session);
163        setMessageHeader(message, event);
164        setMessageBody(message, event);
165        sendEmail(message);
166    }
167
168    @Override
169    public void onStartMiss(SLAEvent event) {
170        boolean flag = false;
171        if (event.getAlertEvents() == null) {
172            flag = alertEvents.contains(SLAEvent.EventStatus.START_MISS);
173        }
174        else if (event.getAlertEvents().contains(SLAEvent.EventStatus.START_MISS.name())) {
175            flag = true;
176        }
177
178        if (flag) {
179            try {
180                sendSLAEmail(event);
181            }
182            catch (Exception e) {
183                LOG.error("Failed to send StartMiss alert email", e);
184            }
185        }
186    }
187
188    @Override
189    public void onEndMiss(SLAEvent event) {
190        boolean flag = false;
191        if (event.getAlertEvents() == null) {
192            flag = alertEvents.contains(SLAEvent.EventStatus.END_MISS);
193        }
194        else if (event.getAlertEvents().contains(SLAEvent.EventStatus.END_MISS.name())) {
195            flag = true;
196        }
197
198        if (flag) {
199            try {
200                sendSLAEmail(event);
201            }
202            catch (Exception e) {
203                LOG.error("Failed to send EndMiss alert email", e);
204            }
205        }
206    }
207
208    @Override
209    public void onDurationMiss(SLAEvent event) {
210        boolean flag = false;
211        if (event.getAlertEvents() == null) {
212            flag = alertEvents.contains(SLAEvent.EventStatus.DURATION_MISS);
213        }
214        else if (event.getAlertEvents().contains(SLAEvent.EventStatus.DURATION_MISS.name())) {
215            flag = true;
216        }
217
218        if (flag) {
219            try {
220                sendSLAEmail(event);
221            }
222            catch (Exception e) {
223                LOG.error("Failed to send DurationMiss alert email", e);
224            }
225        }
226    }
227
228    private Address[] parseAddress(String str) {
229        Address[] addrs = null;
230        List<InternetAddress> addrList = new ArrayList<InternetAddress>();
231        String[] emails = str.split(ADDRESS_SEPARATOR, -1);
232
233        for (String email : emails) {
234            boolean isBlackListed = false;
235            AtomicInteger val = blackList.getIfPresent(email);
236            if(val != null){
237                isBlackListed = ( val.get() >= blacklistFailCount );
238            }
239            if (!isBlackListed) {
240                try {
241                    // turn on strict syntax check by setting 2nd argument true
242                    addrList.add(new InternetAddress(email, true));
243                }
244                catch (AddressException ae) {
245                    // simply skip bad address but do not throw exception
246                    LOG.error("Skipping bad destination address: " + email, ae);
247                }
248            }
249        }
250
251        if (addrList.size() > 0)
252            addrs = (Address[]) addrList.toArray(new InternetAddress[addrList.size()]);
253
254        return addrs;
255    }
256
257    private void setMessageHeader(Message msg, SLAEvent event) throws MessagingException {
258        Address[] from = new InternetAddress[] { fromAddr };
259        Address[] to;
260        StringBuilder subject = new StringBuilder();
261
262        to = parseAddress(event.getAlertContact());
263        if (to == null) {
264            LOG.error("Destination address is null or invalid, stop sending SLA alert email");
265            throw new IllegalArgumentException("Destination address is not specified properly");
266        }
267        subject.append("OOZIE - SLA ");
268        subject.append(event.getEventStatus().name());
269        subject.append(" (AppName=");
270        subject.append(event.getAppName());
271        subject.append(", JobID=");
272        subject.append(event.getId());
273        subject.append(")");
274
275        try {
276            msg.addFrom(from);
277            msg.addRecipients(RecipientType.TO, to);
278            msg.setSubject(subject.toString());
279        }
280        catch (MessagingException me) {
281            LOG.error("Message Exception in setting message header of SLA alert email", me);
282            throw me;
283        }
284    }
285
286    private void setMessageBody(Message msg, SLAEvent event) throws MessagingException {
287        StringBuilder body = new StringBuilder();
288        printHeading(body, "Status");
289        printField(body, EmailField.EVENT_STATUS.toString(), event.getEventStatus());
290        printField(body, EmailField.JOB_STATUS.toString(), event.getJobStatus());
291        printField(body, EmailField.NOTIFICATION_MESSAGE.toString(), event.getNotificationMsg());
292
293        printHeading(body, "Job Details");
294        printField(body, EmailField.APP_NAME.toString(), event.getAppName());
295        printField(body, EmailField.APP_TYPE.toString(), event.getAppType());
296        printField(body, EmailField.USER.toString(), event.getUser());
297        printField(body, EmailField.JOBID.toString(), event.getId());
298        printField(body, EmailField.JOB_URL.toString(), getJobLink(event.getId()));
299        printField(body, EmailField.PARENT_JOBID.toString(), event.getParentId() != null ? event.getParentId() : "N/A");
300        printField(body, EmailField.PARENT_JOB_URL.toString(),
301                event.getParentId() != null ? getJobLink(event.getParentId()) : "N/A");
302        printField(body, EmailField.UPSTREAM_APPS.toString(), event.getUpstreamApps());
303
304        printHeading(body, "SLA Details");
305        printField(body, EmailField.NOMINAL_TIME.toString(), event.getNominalTime());
306        printField(body, EmailField.EXPECTED_START_TIME.toString(), event.getExpectedStart());
307        printField(body, EmailField.ACTUAL_START_TIME.toString(), event.getActualStart());
308        printField(body, EmailField.EXPECTED_END_TIME.toString(), event.getExpectedEnd());
309        printField(body, EmailField.ACTUAL_END_TIME.toString(), event.getActualEnd());
310        printField(body, EmailField.EXPECTED_DURATION.toString(), getDurationInMins(event.getExpectedDuration()));
311        printField(body, EmailField.ACTUAL_DURATION.toString(), getDurationInMins(event.getActualDuration()));
312
313        try {
314            msg.setText(body.toString());
315        }
316        catch (MessagingException me) {
317            LOG.error("Message Exception in setting message body of SLA alert email", me);
318            throw me;
319        }
320    }
321
322    private long getDurationInMins(long duration) {
323        if (duration < 0) {
324            return duration;
325        }
326        return duration / 60000; //Convert millis to minutes
327    }
328
329    private String getJobLink(String jobId) {
330        StringBuffer url = new StringBuffer();
331        String param = "/?job=";
332        url.append(oozieBaseUrl);
333        url.append(param);
334        url.append(jobId);
335        return url.toString();
336    }
337
338    private void printField(StringBuilder st, String name, Object value) {
339        String lineFeed = "\n";
340        if (value != null) {
341            st.append(EMAIL_BODY_FIELD_INDENT);
342            st.append(name);
343            st.append(EMAIL_BODY_FIELD_SEPARATER);
344            st.append(value);
345            st.append(lineFeed);
346        }
347    }
348
349    private void printHeading(StringBuilder st, String header) {
350        st.append(header);
351        st.append(EMAIL_BODY_HEADER_SEPARATER);
352        st.append("\n");
353    }
354
355    private void sendEmail(Message message) throws MessagingException {
356        try {
357            Transport.send(message);
358        }
359        catch (NoSuchProviderException se) {
360            LOG.error("Could not find an SMTP transport provider to email", se);
361            throw se;
362        }
363        catch (MessagingException me) {
364            LOG.error("Message Exception in transporting SLA alert email", me);
365            if (me instanceof SendFailedException) {
366                Address[] invalidAddrs = ((SendFailedException) me).getInvalidAddresses();
367                if (invalidAddrs != null && invalidAddrs.length > 0) {
368                    for (Address addr : invalidAddrs) {
369                        try {
370                            // 'get' method loads key into cache when it doesn't exist
371                            AtomicInteger val = blackList.get(addr.toString());
372                            val.incrementAndGet();
373                        }
374                        catch (Exception e) {
375                            LOG.debug("blacklist loading throwed exception");
376                        }
377                    }
378                }
379            }
380            throw me;
381        }
382    }
383
384    @VisibleForTesting
385    public void addBlackList(String email) throws Exception {
386        // this is for testing
387        if(email == null || email.equals("")){
388            return;
389        }
390        AtomicInteger val = blackList.get(email);
391        val.set(blacklistFailCount);
392    }
393
394    @Override
395    public void onStartMet(SLAEvent work) {
396    }
397
398    @Override
399    public void onEndMet(SLAEvent work) {
400    }
401
402    @Override
403    public void onDurationMet(SLAEvent work) {
404    }
405
406}