001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import java.io.File;
021import java.io.FileInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.Writer;
025import java.util.ArrayList;
026import java.util.Calendar;
027import java.util.Collections;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.regex.Matcher;
033import java.util.regex.Pattern;
034import java.util.zip.GZIPInputStream;
035
036import com.google.common.annotations.VisibleForTesting;
037
038/**
039 * XLogStreamer streams the given log file to logWriter after applying the given filter.
040 */
041public class XLogStreamer {
042    private static XLog LOG = XLog.getLog(XLogStreamer.class);
043
044    /**
045     * Filter that will construct the regular expression that will be used to filter the log statement. And also checks
046     * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by
047     * "|") jobId appName actionId token
048     */
049    public static class Filter {
050        private Map<String, Integer> logLevels;
051        private final Map<String, String> filterParams;
052        private static List<String> parameters = new ArrayList<String>();
053        private boolean noFilter;
054        private Pattern filterPattern;
055
056        // TODO Patterns to be read from config file
057        private static final String DEFAULT_REGEX = "[^\\]]*";
058
059        public static final String ALLOW_ALL_REGEX = "(.*)";
060        private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
061        private static final String WHITE_SPACE_REGEX = "\\s+";
062        private static final String LOG_LEVEL_REGEX = "(\\w+)";
063        private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
064                + WHITE_SPACE_REGEX;
065        private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
066
067        public Filter() {
068            filterParams = new HashMap<String, String>();
069            for (int i = 0; i < parameters.size(); i++) {
070                filterParams.put(parameters.get(i), DEFAULT_REGEX);
071            }
072            logLevels = null;
073            noFilter = true;
074            filterPattern = null;
075        }
076
077        public void setLogLevel(String logLevel) {
078            if (logLevel != null && logLevel.trim().length() > 0) {
079                this.logLevels = new HashMap<String, Integer>();
080                String[] levels = logLevel.split("\\|");
081                for (int i = 0; i < levels.length; i++) {
082                    String s = levels[i].trim().toUpperCase();
083                    try {
084                        XLog.Level.valueOf(s);
085                    }
086                    catch (Exception ex) {
087                        continue;
088                    }
089                    this.logLevels.put(levels[i].toUpperCase(), 1);
090                }
091            }
092        }
093
094        public void setParameter(String filterParam, String value) {
095            if (filterParams.containsKey(filterParam)) {
096                noFilter = false;
097                filterParams.put(filterParam, value);
098            }
099        }
100
101        public static void defineParameter(String filterParam) {
102            parameters.add(filterParam);
103        }
104
105        public boolean isFilterPresent() {
106            if (noFilter && logLevels == null) {
107                return false;
108            }
109            return true;
110        }
111
112        /**
113         * Checks if the logLevel and logMessage goes through the logFilter.
114         *
115         * @param logParts
116         * @return
117         */
118        public boolean matches(ArrayList<String> logParts) {
119            String logLevel = logParts.get(0);
120            String logMessage = logParts.get(1);
121            if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
122                Matcher logMatcher = filterPattern.matcher(logMessage);
123                return logMatcher.matches();
124            }
125            else {
126                return false;
127            }
128        }
129
130        /**
131         * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and
132         * logMessage if the pattern matches i.e A new log statement, else returns null.
133         *
134         * @param logLine
135         * @return Array containing log level and log message
136         */
137        public ArrayList<String> splitLogMessage(String logLine) {
138            Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
139            if (splitter.matches()) {
140                ArrayList<String> logParts = new ArrayList<String>();
141                logParts.add(splitter.group(2));// log level
142                logParts.add(splitter.group(3));// Log Message
143                return logParts;
144            }
145            else {
146                return null;
147            }
148        }
149
150        /**
151         * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be
152         * assigned if no filters are set.
153         */
154        public void constructPattern() {
155            if (noFilter && logLevels == null) {
156                filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
157                return;
158            }
159            StringBuilder sb = new StringBuilder();
160            if (noFilter) {
161                sb.append("(.*)");
162            }
163            else {
164                sb.append("(.* ");
165                for (int i = 0; i < parameters.size(); i++) {
166                    sb.append(parameters.get(i) + "\\[");
167                    sb.append(filterParams.get(parameters.get(i)) + "\\] ");
168                }
169                sb.append(".*)");
170            }
171            filterPattern = Pattern.compile(sb.toString());
172        }
173
174        public static void reset() {
175            parameters.clear();
176        }
177
178        @VisibleForTesting
179        public final Map<String, String> getFilterParams() {
180          return filterParams;
181        }
182    }
183
184    private String logFile;
185    private String logPath;
186    private Filter logFilter;
187    private Writer logWriter;
188    private long logRotation;
189
190    public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) {
191        this.logWriter = logWriter;
192        this.logFilter = logFilter;
193        if (logFile == null) {
194            logFile = "oozie-app.log";
195        }
196        this.logFile = logFile;
197        this.logPath = logPath;
198        this.logRotation = logRotationSecs * 1000l;
199    }
200
201    /**
202     * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
203     * applying the filters.
204     *
205     * @param startTime
206     * @param endTime
207     * @throws IOException
208     */
209    public void streamLog(Date startTime, Date endTime) throws IOException {
210        long startTimeMillis = 0;
211        long endTimeMillis;
212        if (startTime != null) {
213            startTimeMillis = startTime.getTime();
214        }
215        if (endTime == null) {
216            endTimeMillis = System.currentTimeMillis();
217        }
218        else {
219            endTimeMillis = endTime.getTime();
220        }
221        File dir = new File(logPath);
222        ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
223        File file;
224        String fileName;
225        XLogReader logReader;
226        for (int i = 0; i < fileList.size(); i++) {
227            fileName = fileList.get(i).getFileName();
228            if (fileName.endsWith(".gz")) {
229                file = new File(fileName);
230                GZIPInputStream gzipInputStream = null;
231                gzipInputStream = new GZIPInputStream(new FileInputStream(file));
232                logReader = new XLogReader(gzipInputStream, logFilter, logWriter);
233                logReader.processLog();
234                logReader.close();
235                continue;
236            }
237            InputStream ifs;
238            ifs = new FileInputStream(fileName);
239            logReader = new XLogReader(ifs, logFilter, logWriter);
240            logReader.processLog();
241            ifs.close();
242        }
243    }
244
245    /**
246     * File name along with the modified time which will be used to sort later.
247     */
248    class FileInfo implements Comparable<FileInfo> {
249        String fileName;
250        long modTime;
251
252        public FileInfo(String fileName, long modTime) {
253            this.fileName = fileName;
254            this.modTime = modTime;
255        }
256
257        public String getFileName() {
258            return fileName;
259        }
260
261        public long getModTime() {
262            return modTime;
263        }
264
265        public int compareTo(FileInfo fileInfo) {
266            long diff = this.modTime - fileInfo.modTime;
267            if (diff > 0) {
268                return 1;
269            }
270            else if (diff < 0) {
271                return -1;
272            }
273            else {
274                return 0;
275            }
276        }
277    }
278
279    /**
280     * Gets the file list that will have the logs between startTime and endTime.
281     *
282     * @param dir
283     * @param startTime
284     * @param endTime
285     * @param logRotationTime
286     * @param logFile
287     * @return List of files to be streamed
288     */
289    private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) {
290        String[] children = dir.list();
291        ArrayList<FileInfo> fileList = new ArrayList<FileInfo>();
292        if (children == null) {
293            return fileList;
294        }
295        else {
296            for (int i = 0; i < children.length; i++) {
297                String fileName = children[i];
298                if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) {
299                    continue;
300                }
301                File file = new File(dir.getAbsolutePath(), fileName);
302                if (fileName.endsWith(".gz")) {
303                    long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime);
304                    if (gzFileCreationTime != -1) {
305                        fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime));
306                    }
307                    continue;
308                }
309                long modTime = file.lastModified();
310                if (modTime < startTime) {
311                    continue;
312                }
313                if (modTime / logRotationTime > (endTime / logRotationTime + 1)) {
314                    continue;
315                }
316                fileList.add(new FileInfo(file.getAbsolutePath(), modTime));
317            }
318        }
319        Collections.sort(fileList);
320        return fileList;
321    }
322
323    /**
324     * This pattern matches the end of a gzip filename to have a format like "-YYYY-MM-dd-HH.gz" with capturing groups for each part
325     * of the date
326     */
327    public static final Pattern gzTimePattern = Pattern.compile(".*-(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)-(\\d\\d)\\.gz");
328
329    /**
330     * Returns the creation time of the .gz archive if it is relevant to the job
331     *
332     * @param fileName
333     * @param startTime
334     * @param endTime
335     * @return Modification time of .gz file after checking if it is relevant to the job
336     */
337    private long getGZFileCreationTime(String fileName, long startTime, long endTime) {
338        // Default return value of -1 to exclude the file
339        long returnVal = -1;
340
341        // Include oozie.log as oozie.log.gz if it is accidentally GZipped
342        if (fileName.equals("oozie.log.gz")) {
343            LOG.warn("oozie.log has been GZipped, which is unexpected");
344            // Return a value other than -1 to include the file in list
345            returnVal = 0;
346        } else {
347            Matcher m = gzTimePattern.matcher(fileName);
348            if (m.matches() && m.groupCount() == 4) {
349                int year = Integer.parseInt(m.group(1));
350                int month = Integer.parseInt(m.group(2));
351                int day = Integer.parseInt(m.group(3));
352                int hour = Integer.parseInt(m.group(4));
353                int minute = 0;
354                Calendar calendarEntry = Calendar.getInstance();
355                calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August)
356                long logFileStartTime = calendarEntry.getTimeInMillis();
357                long milliSecondsPerHour = 3600000;
358                long logFileEndTime = logFileStartTime + milliSecondsPerHour;
359                /*  To check whether the log content is there in the initial or later part of the log file or
360                    the log content is contained entirely within this log file or
361                    the entire log file contains the event log where the event spans across hours
362                */
363                if ((startTime >= logFileStartTime && startTime <= logFileEndTime)
364                        || (endTime >= logFileStartTime && endTime <= logFileEndTime)
365                        || (startTime <= logFileStartTime && endTime >= logFileEndTime)) {
366                    returnVal = logFileStartTime;
367                }
368            } else {
369                LOG.debug("Filename " + fileName + " does not match the expected format");
370                returnVal = -1;
371            }
372        }
373        return returnVal;
374    }
375}