001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.coord;
019
020import java.net.URI;
021import java.util.ArrayList;
022import java.util.Calendar;
023import java.util.Date;
024import java.util.List;
025import java.util.TimeZone;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.oozie.client.OozieClient;
029import org.apache.oozie.dependency.URIHandler.Context;
030import org.apache.oozie.dependency.URIHandler;
031import org.apache.oozie.util.DateUtils;
032import org.apache.oozie.util.ELEvaluator;
033import org.apache.oozie.util.ParamChecker;
034import org.apache.oozie.util.XLog;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.service.URIHandlerService;
037
038/**
039 * This class implements the EL function related to coordinator
040 */
041
042public class CoordELFunctions {
043    final public static String DATASET = "oozie.coord.el.dataset.bean";
044    final public static String COORD_ACTION = "oozie.coord.el.app.bean";
045    final public static String CONFIGURATION = "oozie.coord.el.conf";
046    final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time";
047    // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
048    final public static String INSTANCE_SEPARATOR = "#";
049    final public static String DIR_SEPARATOR = ",";
050    // TODO: in next release, support flexibility
051    private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS";
052
053    /**
054     * Used in defining the frequency in 'day' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
055     *
056     * @param val frequency in number of days.
057     * @return number of days and also set the frequency timeunit to "day"
058     */
059    public static int ph1_coord_days(int val) {
060        val = ParamChecker.checkGTZero(val, "n");
061        ELEvaluator eval = ELEvaluator.getCurrent();
062        eval.setVariable("timeunit", TimeUnit.DAY);
063        eval.setVariable("endOfDuration", TimeUnit.NONE);
064        return val;
065    }
066
067    /**
068     * Used in defining the frequency in 'month' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
069     *
070     * @param val frequency in number of months.
071     * @return number of months and also set the frequency timeunit to "month"
072     */
073    public static int ph1_coord_months(int val) {
074        val = ParamChecker.checkGTZero(val, "n");
075        ELEvaluator eval = ELEvaluator.getCurrent();
076        eval.setVariable("timeunit", TimeUnit.MONTH);
077        eval.setVariable("endOfDuration", TimeUnit.NONE);
078        return val;
079    }
080
081    /**
082     * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val &gt; 0</code> and should
083     * be integer.
084     *
085     * @param val frequency in number of hours.
086     * @return number of minutes and also set the frequency timeunit to "minute"
087     */
088    public static int ph1_coord_hours(int val) {
089        val = ParamChecker.checkGTZero(val, "n");
090        ELEvaluator eval = ELEvaluator.getCurrent();
091        eval.setVariable("timeunit", TimeUnit.MINUTE);
092        eval.setVariable("endOfDuration", TimeUnit.NONE);
093        return val * 60;
094    }
095
096    /**
097     * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val &gt; 0</code> and should be integer.
098     *
099     * @param val frequency in number of minutes.
100     * @return number of minutes and also set the frequency timeunit to "minute"
101     */
102    public static int ph1_coord_minutes(int val) {
103        val = ParamChecker.checkGTZero(val, "n");
104        ELEvaluator eval = ELEvaluator.getCurrent();
105        eval.setVariable("timeunit", TimeUnit.MINUTE);
106        eval.setVariable("endOfDuration", TimeUnit.NONE);
107        return val;
108    }
109
110    /**
111     * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will
112     * start at 00:00 hour of each day. <p/> domain: <code> val &gt; 0</code> and should be integer.
113     *
114     * @param val frequency in number of days.
115     * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day"
116     */
117    public static int ph1_coord_endOfDays(int val) {
118        val = ParamChecker.checkGTZero(val, "n");
119        ELEvaluator eval = ELEvaluator.getCurrent();
120        eval.setVariable("timeunit", TimeUnit.DAY);
121        eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY);
122        return val;
123    }
124
125    /**
126     * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will
127     * start at first day of each month at 00:00 hour. <p/> domain: <code> val &gt; 0</code> and should be integer.
128     *
129     * @param val: frequency in number of months.
130     * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month"
131     */
132    public static int ph1_coord_endOfMonths(int val) {
133        val = ParamChecker.checkGTZero(val, "n");
134        ELEvaluator eval = ELEvaluator.getCurrent();
135        eval.setVariable("timeunit", TimeUnit.MONTH);
136        eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH);
137        return val;
138    }
139
140    /**
141     * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/>
142     * 1. Timezone of both dataset and job <p/> 2. Action creation Time
143     *
144     * @return difference in minutes (DataSet TZ Offset - Application TZ offset)
145     */
146    public static int ph2_coord_tzOffset() {
147        long actionCreationTime = getActionCreationtime().getTime();
148        TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ");
149        TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ");
150        return (dsTZ.getOffset(actionCreationTime) - jobTZ.getOffset(actionCreationTime)) / (1000 * 60);
151    }
152
153    public static int ph3_coord_tzOffset() {
154        return ph2_coord_tzOffset();
155    }
156
157    /**
158     * Returns the a date string while given a base date in 'strBaseDate',
159     * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH).
160     *
161     * @param strBaseDate -- base date
162     * @param offset -- any number
163     * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH
164     * @return date string
165     * @throws Exception
166     */
167    public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
168        Calendar baseCalDate = DateUtils.getCalendar(strBaseDate);
169        StringBuilder buffer = new StringBuilder();
170        baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset);
171        buffer.append(DateUtils.formatDateOozieTZ(baseCalDate));
172        return buffer.toString();
173    }
174
175    public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception {
176        return ph2_coord_dateOffset(strBaseDate, offset, unit);
177    }
178
179    /**
180     * Determine the date-time in Oozie processing timezone of n-th future available dataset instance
181     * from nominal Time but not beyond the instance specified as 'instance.
182     * <p/>
183     * It depends on:
184     * <p/>
185     * 1. Data set frequency
186     * <p/>
187     * 2. Data set Time unit (day, month, minute)
188     * <p/>
189     * 3. Data set Time zone/DST
190     * <p/>
191     * 4. End Day/Month flag
192     * <p/>
193     * 5. Data set initial instance
194     * <p/>
195     * 6. Action Creation Time
196     * <p/>
197     * 7. Existence of dataset's directory
198     *
199     * @param n :instance count
200     *        <p/>
201     *        domain: n >= 0, n is integer
202     * @param instance: How many future instance it should check? value should
203     *        be >=0
204     * @return date-time in Oozie processing timezone of the n-th instance
205     *         <p/>
206     * @throws Exception
207     */
208    public static String ph3_coord_future(int n, int instance) throws Exception {
209        ParamChecker.checkGEZero(n, "future:n");
210        ParamChecker.checkGTZero(instance, "future:instance");
211        if (isSyncDataSet()) {// For Sync Dataset
212            return coord_future_sync(n, instance);
213        }
214        else {
215            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
216        }
217    }
218
219    /**
220     * Determine the date-time in Oozie processing timezone of the future available dataset instances
221     * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'.
222     * <p/>
223     * It depends on:
224     * <p/>
225     * 1. Data set frequency
226     * <p/>
227     * 2. Data set Time unit (day, month, minute)
228     * <p/>
229     * 3. Data set Time zone/DST
230     * <p/>
231     * 4. End Day/Month flag
232     * <p/>
233     * 5. Data set initial instance
234     * <p/>
235     * 6. Action Creation Time
236     * <p/>
237     * 7. Existence of dataset's directory
238     *
239     * @param start : start instance offset
240     *        <p/>
241     *        domain: start >= 0, start is integer
242     * @param end : end instance offset
243     *        <p/>
244     *        domain: end >= 0, end is integer
245     * @param instance: How many future instance it should check? value should
246     *        be >=0
247     * @return date-time in Oozie processing timezone of the instances from start to end offsets
248     *        delimited by comma.
249     *         <p/>
250     * @throws Exception
251     */
252    public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception {
253        ParamChecker.checkGEZero(start, "future:n");
254        ParamChecker.checkGEZero(end, "future:n");
255        ParamChecker.checkGTZero(instance, "future:instance");
256        if (isSyncDataSet()) {// For Sync Dataset
257            return coord_futureRange_sync(start, end, instance);
258        }
259        else {
260            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
261        }
262    }
263
264    private static String coord_future_sync(int n, int instance) throws Exception {
265        return coord_futureRange_sync(n, n, instance);
266    }
267
268    private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception {
269        final XLog LOG = XLog.getLog(CoordELFunctions.class);
270        final Thread currentThread = Thread.currentThread();
271        ELEvaluator eval = ELEvaluator.getCurrent();
272        String retVal = "";
273        int datasetFrequency = (int) getDSFrequency();// in minutes
274        TimeUnit dsTimeUnit = getDSTimeUnit();
275        int[] instCount = new int[1];
276        Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
277        StringBuilder resolvedInstances = new StringBuilder();
278        StringBuilder resolvedURIPaths = new StringBuilder();
279        if (nominalInstanceCal != null) {
280            Calendar initInstance = getInitialInstanceCal();
281            nominalInstanceCal = (Calendar) initInstance.clone();
282            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
283
284            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
285            if (ds == null) {
286                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
287            }
288            String uriTemplate = ds.getUriTemplate();
289            Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
290            if (conf == null) {
291                throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
292            }
293            int available = 0, checkedInstance = 0;
294            boolean resolved = false;
295            String user = ParamChecker
296                    .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
297            String doneFlag = ds.getDoneFlag();
298            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
299            URIHandler uriHandler = null;
300            Context uriContext = null;
301            try {
302                while (instance >= checkedInstance && !currentThread.isInterrupted()) {
303                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
304                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
305                    if (uriHandler == null) {
306                        URI uri = new URI(uriPath);
307                        uriHandler = uriService.getURIHandler(uri);
308                        uriContext = uriHandler.getContext(uri, conf, user);
309                    }
310                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
311                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
312                        if (available == endOffset) {
313                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
314                            resolved = true;
315                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
316                            resolvedURIPaths.append(uriPath);
317                            retVal = resolvedInstances.toString();
318                            eval.setVariable("resolved_path", resolvedURIPaths.toString());
319                            break;
320                        }
321                        else if (available >= startOffset) {
322                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
323                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
324                                    INSTANCE_SEPARATOR);
325                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
326                        }
327                        available++;
328                    }
329                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
330                    nominalInstanceCal = (Calendar) initInstance.clone();
331                    instCount[0]++;
332                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
333                    checkedInstance++;
334                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
335                }
336            }
337            finally {
338                if (uriContext != null) {
339                    uriContext.destroy();
340                }
341            }
342            if (!resolved) {
343                // return unchanged future function with variable 'is_resolved'
344                // to 'false'
345                eval.setVariable("is_resolved", Boolean.FALSE);
346                if (startOffset == endOffset) {
347                    retVal = "${coord:future(" + startOffset + ", " + instance + ")}";
348                }
349                else {
350                    retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}";
351                }
352            }
353            else {
354                eval.setVariable("is_resolved", Boolean.TRUE);
355            }
356        }
357        else {// No feasible nominal time
358            eval.setVariable("is_resolved", Boolean.TRUE);
359            retVal = "";
360        }
361        return retVal;
362    }
363
364    /**
365     * Return nominal time or Action Creation Time.
366     * <p/>
367     *
368     * @return coordinator action creation or materialization date time
369     * @throws Exception if unable to format the Date object to String
370     */
371    public static String ph2_coord_nominalTime() throws Exception {
372        ELEvaluator eval = ELEvaluator.getCurrent();
373        SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
374                "Coordinator Action");
375        return DateUtils.formatDateOozieTZ(action.getNominalTime());
376    }
377
378    public static String ph3_coord_nominalTime() throws Exception {
379        return ph2_coord_nominalTime();
380    }
381
382    /**
383     * Convert from standard date-time formatting to a desired format.
384     * <p/>
385     * @param dateTimeStr - A timestamp in standard (ISO8601) format.
386     * @param format - A string representing the desired format.
387     * @return coordinator action creation or materialization date time
388     * @throws Exception if unable to format the Date object to String
389     */
390    public static String ph2_coord_formatTime(String dateTimeStr, String format)
391        throws Exception {
392        Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr);
393        return DateUtils.formatDateCustom(dateTime, format);
394    }
395
396    public static String ph3_coord_formatTime(String dateTimeStr, String format)
397        throws Exception {
398        return ph2_coord_formatTime(dateTimeStr, format);
399    }
400
401    /**
402     * Return Action Id. <p/>
403     *
404     * @return coordinator action Id
405     */
406    public static String ph2_coord_actionId() throws Exception {
407        ELEvaluator eval = ELEvaluator.getCurrent();
408        SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
409                "Coordinator Action");
410        return action.getActionId();
411    }
412
413    public static String ph3_coord_actionId() throws Exception {
414        return ph2_coord_actionId();
415    }
416
417    /**
418     * Return Job Name. <p/>
419     *
420     * @return coordinator name
421     */
422    public static String ph2_coord_name() throws Exception {
423        ELEvaluator eval = ELEvaluator.getCurrent();
424        SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION),
425                "Coordinator Action");
426        return action.getName();
427    }
428
429    public static String ph3_coord_name() throws Exception {
430        return ph2_coord_name();
431    }
432
433    /**
434     * Return Action Start time. <p/>
435     *
436     * @return coordinator action start time
437     * @throws Exception if unable to format the Date object to String
438     */
439    public static String ph2_coord_actualTime() throws Exception {
440        ELEvaluator eval = ELEvaluator.getCurrent();
441        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
442        if (coordAction == null) {
443            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
444        }
445        return DateUtils.formatDateOozieTZ(coordAction.getActualTime());
446    }
447
448    public static String ph3_coord_actualTime() throws Exception {
449        return ph2_coord_actualTime();
450    }
451
452    /**
453     * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level
454     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
455     * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
456     * unresolved, this function will echo back the original function <p/> otherwise it sends the uris.
457     *
458     * @param dataInName : Datain name
459     * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest)
460     *         , echo back <p/> the function without resolving the function.
461     */
462    public static String ph3_coord_dataIn(String dataInName) {
463        String uris = "";
464        ELEvaluator eval = ELEvaluator.getCurrent();
465        uris = (String) eval.getVariable(".datain." + dataInName);
466        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
467        if (unresolved != null && unresolved.booleanValue() == true) {
468            return "${coord:dataIn('" + dataInName + "')}";
469        }
470        return uris;
471    }
472
473    /**
474     * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level
475     * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris.
476     *
477     * @param dataOutName : Dataout name
478     * @return the list of URI's separated by INSTANCE_SEPARATOR
479     */
480    public static String ph3_coord_dataOut(String dataOutName) {
481        String uris = "";
482        ELEvaluator eval = ELEvaluator.getCurrent();
483        uris = (String) eval.getVariable(".dataout." + dataOutName);
484        return uris;
485    }
486
487    /**
488     * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1.
489     * Data set frequency <p/> 2.
490     * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data
491     * set initial instance <p/> 6. Action Creation Time
492     *
493     * @param n instance count domain: n is integer
494     * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is
495     * earlier than Initial-Instance of DS
496     * @throws Exception
497     */
498    public static String ph2_coord_current(int n) throws Exception {
499        if (isSyncDataSet()) { // For Sync Dataset
500            return coord_current_sync(n);
501        }
502        else {
503            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
504        }
505    }
506
507    /**
508     * Determine the date-time in Oozie processing timezone of current dataset instances
509     * from start to end offsets from the nominal time. <p/> It depends
510     * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
511     * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time
512     *
513     * @param start :start instance offset <p/> domain: start <= 0, start is integer
514     * @param end :end instance offset <p/> domain: end <= 0, end is integer
515     * @return date-time in Oozie processing timezone of the instances from start to end offsets
516     *        delimited by comma. <p/> If the current instance time of the dataset based on the Action Creation Time
517     *        is earlier than the Initial-Instance of DS an empty string is returned.
518     *        If an instance within the range is earlier than Initial-Instance of DS that instance is ignored
519     * @throws Exception
520     */
521    public static String ph2_coord_currentRange(int start, int end) throws Exception {
522        if (isSyncDataSet()) { // For Sync Dataset
523            return coord_currentRange_sync(start, end);
524        }
525        else {
526            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
527        }
528    }
529
530    /**
531     * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It
532     * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST
533     * <p/> 4. Data set initial instance <p/> 5. Action Creation Time
534     *
535     * @param n offset amount (integer)
536     * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
537     * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time
538     * @throws Exception if there was a problem formatting
539     */
540    public static String ph2_coord_offset(int n, String timeUnit) throws Exception {
541        if (isSyncDataSet()) { // For Sync Dataset
542            return coord_offset_sync(n, timeUnit);
543        }
544        else {
545            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
546        }
547    }
548
549    /**
550     * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency
551     * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
552     * Data set initial instance <p/> 6. Action Creation Time
553     *
554     * @param n instance count <p/> domain: n is integer
555     * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
556     * @throws Exception
557     */
558    public static int ph2_coord_hoursInDay(int n) throws Exception {
559        int datasetFrequency = (int) getDSFrequency();
560        // /Calendar nominalInstanceCal =
561        // getCurrentInstance(getActionCreationtime());
562        Calendar nominalInstanceCal = getEffectiveNominalTime();
563        if (nominalInstanceCal == null) {
564            return -1;
565        }
566        nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
567        /*
568         * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
569         * { return -1; }
570         */
571        nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
572        // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
573        return DateUtils.hoursInDay(nominalInstanceCal);
574    }
575
576    public static int ph3_coord_hoursInDay(int n) throws Exception {
577        return ph2_coord_hoursInDay(n);
578    }
579
580    /**
581     * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency .
582     * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5.
583     * Data set initial instance <p/> 6. Action Creation Time
584     *
585     * @param n instance count. domain: n is integer
586     * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS
587     * @throws Exception
588     */
589    public static int ph2_coord_daysInMonth(int n) throws Exception {
590        int datasetFrequency = (int) getDSFrequency();// in minutes
591        // Calendar nominalInstanceCal =
592        // getCurrentInstance(getActionCreationtime());
593        Calendar nominalInstanceCal = getEffectiveNominalTime();
594        if (nominalInstanceCal == null) {
595            return -1;
596        }
597        nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n);
598        /*
599         * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0)
600         * { return -1; }
601         */
602        nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ
603        // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
604        return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH);
605    }
606
607    public static int ph3_coord_daysInMonth(int n) throws Exception {
608        return ph2_coord_daysInMonth(n);
609    }
610
611    /**
612     * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends
613     * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
614     * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
615     * dataset's directory
616     *
617     * @param n :instance count <p/> domain: n <= 0, n is integer
618     * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is
619     * earlier than Initial-Instance of DS
620     * @throws Exception
621     */
622    public static String ph3_coord_latest(int n) throws Exception {
623        ParamChecker.checkLEZero(n, "latest:n");
624        if (isSyncDataSet()) {// For Sync Dataset
625            return coord_latest_sync(n);
626        }
627        else {
628            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
629        }
630    }
631
632    /**
633     * Determine the date-time in Oozie processing timezone of latest available dataset instances
634     * from start to end offsets from the nominal time. <p/> It depends
635     * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST
636     * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of
637     * dataset's directory
638     *
639     * @param start :start instance offset <p/> domain: start <= 0, start is integer
640     * @param end :end instance offset <p/> domain: end <= 0, end is integer
641     * @return date-time in Oozie processing timezone of the instances from start to end offsets
642     *        delimited by comma. <p/> returns 'null' means start offset instance is
643     *        earlier than Initial-Instance of DS
644     * @throws Exception
645     */
646    public static String ph3_coord_latestRange(int start, int end) throws Exception {
647        ParamChecker.checkLEZero(start, "latest:n");
648        ParamChecker.checkLEZero(end, "latest:n");
649        if (isSyncDataSet()) {// For Sync Dataset
650            return coord_latestRange_sync(start, end);
651        }
652        else {
653            throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet");
654        }
655    }
656
657    /**
658     * Configure an evaluator with data set and application specific information. <p/> Helper method of associating
659     * dataset and application object
660     *
661     * @param evaluator : to set variables
662     * @param ds : Data Set object
663     * @param coordAction : Application instance
664     */
665    public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) {
666        evaluator.setVariable(COORD_ACTION, coordAction);
667        evaluator.setVariable(DATASET, ds);
668    }
669
670    /**
671     * Helper method to wrap around with "${..}". <p/>
672     *
673     *
674     * @param eval :EL evaluator
675     * @param expr : expression to evaluate
676     * @return Resolved expression or echo back the same expression
677     * @throws Exception
678     */
679    public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception {
680        try {
681            eval.setVariable(".wrap", null);
682            String result = eval.evaluate(expr, String.class);
683            if (eval.getVariable(".wrap") != null) {
684                return "${" + result + "}";
685            }
686            else {
687                return result;
688            }
689        }
690        catch (Exception e) {
691            throw new Exception("Unable to evaluate :" + expr + ":\n", e);
692        }
693    }
694
695    // Set of echo functions
696
697    public static String ph1_coord_current_echo(String n) {
698        return echoUnResolved("current", n);
699    }
700
701    public static String ph1_coord_currentRange_echo(String start, String end) {
702        return echoUnResolved("currentRange", start + ", " + end);
703    }
704
705    public static String ph1_coord_offset_echo(String n, String timeUnit) {
706        return echoUnResolved("offset", n + " , " + timeUnit);
707    }
708
709    public static String ph2_coord_current_echo(String n) {
710        return echoUnResolved("current", n);
711    }
712
713    public static String ph2_coord_currentRange_echo(String start, String end) {
714        return echoUnResolved("currentRange", start + ", " + end);
715    }
716
717    public static String ph2_coord_offset_echo(String n, String timeUnit) {
718        return echoUnResolved("offset", n + " , " + timeUnit);
719    }
720
721    public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) {
722        return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit);
723    }
724
725    public static String ph1_coord_formatTime_echo(String dateTime, String format) {
726        // Quote the dateTime value since it would contain a ':'.
727        return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format);
728    }
729
730    public static String ph1_coord_latest_echo(String n) {
731        return echoUnResolved("latest", n);
732    }
733
734    public static String ph2_coord_latest_echo(String n) {
735        return ph1_coord_latest_echo(n);
736    }
737
738    public static String ph1_coord_future_echo(String n, String instance) {
739        return echoUnResolved("future", n + ", " + instance + "");
740    }
741
742    public static String ph2_coord_future_echo(String n, String instance) {
743        return ph1_coord_future_echo(n, instance);
744    }
745
746    public static String ph1_coord_latestRange_echo(String start, String end) {
747        return echoUnResolved("latestRange", start + ", " + end);
748    }
749
750    public static String ph2_coord_latestRange_echo(String start, String end) {
751        return ph1_coord_latestRange_echo(start, end);
752    }
753
754    public static String ph1_coord_futureRange_echo(String start, String end, String instance) {
755        return echoUnResolved("futureRange", start + ", " + end + ", " + instance);
756    }
757
758    public static String ph2_coord_futureRange_echo(String start, String end, String instance) {
759        return ph1_coord_futureRange_echo(start, end, instance);
760    }
761
762    public static String ph1_coord_dataIn_echo(String n) {
763        ELEvaluator eval = ELEvaluator.getCurrent();
764        String val = (String) eval.getVariable("oozie.dataname." + n);
765        if (val == null || val.equals("data-in") == false) {
766            XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid");
767            throw new RuntimeException("data_in_name " + n + " is not valid");
768        }
769        return echoUnResolved("dataIn", "'" + n + "'");
770    }
771
772    public static String ph1_coord_dataOut_echo(String n) {
773        ELEvaluator eval = ELEvaluator.getCurrent();
774        String val = (String) eval.getVariable("oozie.dataname." + n);
775        if (val == null || val.equals("data-out") == false) {
776            XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid");
777            throw new RuntimeException("data_out_name " + n + " is not valid");
778        }
779        return echoUnResolved("dataOut", "'" + n + "'");
780    }
781
782    public static String ph1_coord_nominalTime_echo() {
783        return echoUnResolved("nominalTime", "");
784    }
785
786    public static String ph1_coord_nominalTime_echo_wrap() {
787        // return "${coord:nominalTime()}"; // no resolution
788        return echoUnResolved("nominalTime", "");
789    }
790
791    public static String ph1_coord_nominalTime_echo_fixed() {
792        return "2009-03-06T010:00"; // Dummy resolution
793    }
794
795    public static String ph1_coord_actualTime_echo_wrap() {
796        // return "${coord:actualTime()}"; // no resolution
797        return echoUnResolved("actualTime", "");
798    }
799
800    public static String ph1_coord_actionId_echo() {
801        return echoUnResolved("actionId", "");
802    }
803
804    public static String ph1_coord_name_echo() {
805        return echoUnResolved("name", "");
806    }
807
808    // The following echo functions are not used in any phases yet
809    // They are here for future purpose.
810    public static String coord_minutes_echo(String n) {
811        return echoUnResolved("minutes", n);
812    }
813
814    public static String coord_hours_echo(String n) {
815        return echoUnResolved("hours", n);
816    }
817
818    public static String coord_days_echo(String n) {
819        return echoUnResolved("days", n);
820    }
821
822    public static String coord_endOfDay_echo(String n) {
823        return echoUnResolved("endOfDay", n);
824    }
825
826    public static String coord_months_echo(String n) {
827        return echoUnResolved("months", n);
828    }
829
830    public static String coord_endOfMonth_echo(String n) {
831        return echoUnResolved("endOfMonth", n);
832    }
833
834    public static String coord_actualTime_echo() {
835        return echoUnResolved("actualTime", "");
836    }
837
838    // This echo function will always return "24" for validation only.
839    // This evaluation ****should not**** replace the original XML
840    // Create a temporary string and validate the function
841    // This is **required** for evaluating an expression like
842    // coord:HoursInDay(0) + 3
843    // actual evaluation will happen in phase 2 or phase 3.
844    public static String ph1_coord_hoursInDay_echo(String n) {
845        return "24";
846        // return echoUnResolved("hoursInDay", n);
847    }
848
849    // This echo function will always return "30" for validation only.
850    // This evaluation ****should not**** replace the original XML
851    // Create a temporary string and validate the function
852    // This is **required** for evaluating an expression like
853    // coord:daysInMonth(0) + 3
854    // actual evaluation will happen in phase 2 or phase 3.
855    public static String ph1_coord_daysInMonth_echo(String n) {
856        // return echoUnResolved("daysInMonth", n);
857        return "30";
858    }
859
860    // This echo function will always return "3" for validation only.
861    // This evaluation ****should not**** replace the original XML
862    // Create a temporary string and validate the function
863    // This is **required** for evaluating an expression like coord:tzOffset + 2
864    // actual evaluation will happen in phase 2 or phase 3.
865    public static String ph1_coord_tzOffset_echo() {
866        // return echoUnResolved("tzOffset", "");
867        return "3";
868    }
869
870    // Local methods
871    /**
872     * @param n
873     * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the
874     *         Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset.
875     * @throws Exception
876     */
877    private static String coord_current_sync(int n) throws Exception {
878        return coord_currentRange_sync(n, n);
879    }
880
881    private static String coord_currentRange_sync(int start, int end) throws Exception {
882        final XLog LOG = XLog.getLog(CoordELFunctions.class);
883        int datasetFrequency = getDSFrequency();// in minutes
884        TimeUnit dsTimeUnit = getDSTimeUnit();
885        int[] instCount = new int[1];// used as pass by ref
886        Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
887        StringBuilder instanceList = new StringBuilder();
888        if (nominalInstanceCal == null) {
889            LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
890                    + " returned. This means that no data is available at the current-instance specified by the user"
891                    + " and the user could try modifying his initial-instance to an earlier time.");
892            return "";
893        } else {
894            Calendar initInstance = getInitialInstanceCal();
895            instCount[0] = instCount[0] + end;
896            // Add in the reverse order - newest instance first.
897            for (int i = end; i >= start; i--) {
898                // Tried to avoid the clone. But subtracting datasetFrequency gives different results than multiplying
899                // and Spring DST transition test in TestCoordELfunctions.testCurrent() fails
900                //nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
901                nominalInstanceCal = (Calendar) initInstance.clone();
902                nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
903                instCount[0]--;
904                if (nominalInstanceCal.compareTo(initInstance) < 0) {
905                    LOG.warn("If the initial instance of the dataset is later than the current-instance specified,"
906                            + " such as coord:current({0}) in this case, an empty string is returned. This means that"
907                            + " no data is available at the current-instance specified by the user and the user could"
908                            + " try modifying his initial-instance to an earlier time.", start);
909                    break;
910                }
911                else {
912                    instanceList.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
913                    instanceList.append(CoordELFunctions.INSTANCE_SEPARATOR);
914                }
915            }
916        }
917
918        if (instanceList.length() > 0) {
919            instanceList.setLength(instanceList.length() - CoordELFunctions.INSTANCE_SEPARATOR.length());
920        }
921        return instanceList.toString();
922    }
923
924    /**
925     *
926     * @param n offset amount (integer)
927     * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
928     * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the
929     *         offset instance <p/> is earlier than the Initial_Instance of dataset.
930     * @throws Exception
931     */
932    private static String coord_offset_sync(int n, String timeUnit) throws Exception {
933        Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null);
934        if (rawCal == null) {
935            // warning already logged by resolveOffsetRawTime()
936            return "";
937        }
938
939        int freq = getDSFrequency();
940        TimeUnit freqUnit = getDSTimeUnit();
941        int freqCount = 0;
942        // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same
943        // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time
944        // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true
945        // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that.
946        Calendar cal = getInitialInstanceCal();
947        if (rawCal.before(cal)) {
948            while (cal.after(rawCal)) {
949                cal.add(freqUnit.getCalendarUnit(), -freq);
950                freqCount--;
951            }
952        }
953        else if (rawCal.after(cal)) {
954            while (cal.before(rawCal)) {
955                cal.add(freqUnit.getCalendarUnit(), freq);
956                freqCount++;
957            }
958        }
959        if (cal.before(rawCal)) {
960            rawCal = cal;
961        }
962        else if (cal.after(rawCal)) {
963            cal.add(freqUnit.getCalendarUnit(), -freq);
964            rawCal = cal;
965            freqCount--;
966        }
967        String rawCalStr = DateUtils.formatDateOozieTZ(rawCal);
968
969        Calendar nominalInstanceCal = getInitialInstanceCal();
970        nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount);
971        if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) {
972            XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance"
973                    + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no"
974                    + " data is available at the offset instance specified by the user and the user could try modifying his"
975                    + " initial-instance to an earlier time.", n, timeUnit);
976            return "";
977        }
978        String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal);
979
980        if (!rawCalStr.equals(nominalCalStr)) {
981            throw new RuntimeException("Shouldn't happen");
982        }
983        return rawCalStr;
984    }
985
986    /**
987     * @param offset
988     * @return n-th available latest instance Date-Time for SYNC data-set
989     * @throws Exception
990     */
991    private static String coord_latest_sync(int offset) throws Exception {
992        return coord_latestRange_sync(offset, offset);
993    }
994
995    private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
996        final XLog LOG = XLog.getLog(CoordELFunctions.class);
997        final Thread currentThread = Thread.currentThread();
998        ELEvaluator eval = ELEvaluator.getCurrent();
999        String retVal = "";
1000        int datasetFrequency = (int) getDSFrequency();// in minutes
1001        TimeUnit dsTimeUnit = getDSTimeUnit();
1002        int[] instCount = new int[1];
1003        boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
1004        Calendar nominalInstanceCal;
1005        if (useCurrentTime) {
1006            nominalInstanceCal = getCurrentInstance(new Date(), instCount);
1007        }
1008        else {
1009            nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
1010        }
1011        StringBuilder resolvedInstances = new StringBuilder();
1012        StringBuilder resolvedURIPaths = new StringBuilder();
1013        if (nominalInstanceCal != null) {
1014            Calendar initInstance = getInitialInstanceCal();
1015            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1016            if (ds == null) {
1017                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1018            }
1019            String uriTemplate = ds.getUriTemplate();
1020            Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
1021            if (conf == null) {
1022                throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
1023            }
1024            int available = 0;
1025            boolean resolved = false;
1026            String user = ParamChecker
1027                    .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
1028            String doneFlag = ds.getDoneFlag();
1029            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
1030            URIHandler uriHandler = null;
1031            Context uriContext = null;
1032            try {
1033                while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) {
1034                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
1035                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
1036                    if (uriHandler == null) {
1037                        URI uri = new URI(uriPath);
1038                        uriHandler = uriService.getURIHandler(uri);
1039                        uriContext = uriHandler.getContext(uri, conf, user);
1040                    }
1041                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
1042                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
1043                        XLog.getLog(CoordELFunctions.class)
1044                                .debug("Found latest(" + available + "): " + uriWithDoneFlag);
1045                        if (available == startOffset) {
1046                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1047                            resolved = true;
1048                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
1049                            resolvedURIPaths.append(uriPath);
1050                            retVal = resolvedInstances.toString();
1051                            eval.setVariable("resolved_path", resolvedURIPaths.toString());
1052                            break;
1053                        }
1054                        else if (available <= endOffset) {
1055                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
1056                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
1057                                    INSTANCE_SEPARATOR);
1058                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
1059                        }
1060
1061                        available--;
1062                    }
1063                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
1064                    nominalInstanceCal = (Calendar) initInstance.clone();
1065                    instCount[0]--;
1066                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
1067                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
1068                }
1069            }
1070            finally {
1071                if (uriContext != null) {
1072                    uriContext.destroy();
1073                }
1074            }
1075            if (!resolved) {
1076                // return unchanged latest function with variable 'is_resolved'
1077                // to 'false'
1078                eval.setVariable("is_resolved", Boolean.FALSE);
1079                if (startOffset == endOffset) {
1080                    retVal = "${coord:latest(" + startOffset + ")}";
1081                }
1082                else {
1083                    retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}";
1084                }
1085            }
1086            else {
1087                eval.setVariable("is_resolved", Boolean.TRUE);
1088            }
1089        }
1090        else {// No feasible nominal time
1091            eval.setVariable("is_resolved", Boolean.FALSE);
1092        }
1093        return retVal;
1094    }
1095
1096    /**
1097     * @param tm
1098     * @return a new Evaluator to be used for URI-template evaluation
1099     */
1100    private static ELEvaluator getUriEvaluator(Calendar tm) {
1101        tm.setTimeZone(DateUtils.getOozieProcessingTimeZone());
1102        ELEvaluator retEval = new ELEvaluator();
1103        retEval.setVariable("YEAR", tm.get(Calendar.YEAR));
1104        retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm
1105                .get(Calendar.MONTH) + 1));
1106        retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm
1107                .get(Calendar.DAY_OF_MONTH));
1108        retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm
1109                .get(Calendar.HOUR_OF_DAY));
1110        retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm
1111                .get(Calendar.MINUTE));
1112        return retEval;
1113    }
1114
1115    /**
1116     * @return whether a data set is SYNCH or ASYNC
1117     */
1118    private static boolean isSyncDataSet() {
1119        ELEvaluator eval = ELEvaluator.getCurrent();
1120        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1121        if (ds == null) {
1122            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1123        }
1124        return ds.getType().equalsIgnoreCase("SYNC");
1125    }
1126
1127    /**
1128     * Check whether a function should be resolved.
1129     *
1130     * @param functionName
1131     * @param n
1132     * @return null if the functionName needs to be resolved otherwise return the calling function unresolved.
1133     */
1134    private static String checkIfResolved(String functionName, String n) {
1135        ELEvaluator eval = ELEvaluator.getCurrent();
1136        String replace = (String) eval.getVariable("resolve_" + functionName);
1137        if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't
1138            // resolve
1139            // return "${coord:" + functionName + "(" + n +")}"; //Unresolved
1140            eval.setVariable(".wrap", "true");
1141            return "coord:" + functionName + "(" + n + ")"; // Unresolved
1142        }
1143        return null; // Resolved it
1144    }
1145
1146    private static String echoUnResolved(String functionName, String n) {
1147        return echoUnResolvedPre(functionName, n, "coord:");
1148    }
1149
1150    private static String echoUnResolvedPre(String functionName, String n, String prefix) {
1151        ELEvaluator eval = ELEvaluator.getCurrent();
1152        eval.setVariable(".wrap", "true");
1153        return prefix + functionName + "(" + n + ")"; // Unresolved
1154    }
1155
1156    /**
1157     * @return the initial instance of a DataSet in DATE
1158     */
1159    private static Date getInitialInstance() {
1160        ELEvaluator eval = ELEvaluator.getCurrent();
1161        return getInitialInstance(eval);
1162    }
1163
1164    /**
1165     * @return the initial instance of a DataSet in DATE
1166     */
1167    private static Date getInitialInstance(ELEvaluator eval) {
1168        return getInitialInstanceCal(eval).getTime();
1169        // return ds.getInitInstance();
1170    }
1171
1172    /**
1173     * @return the initial instance of a DataSet in Calendar
1174     */
1175    private static Calendar getInitialInstanceCal() {
1176        ELEvaluator eval = ELEvaluator.getCurrent();
1177        return getInitialInstanceCal(eval);
1178    }
1179
1180    /**
1181     * @return the initial instance of a DataSet in Calendar
1182     */
1183    private static Calendar getInitialInstanceCal(ELEvaluator eval) {
1184        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1185        if (ds == null) {
1186            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1187        }
1188        Calendar effInitTS = Calendar.getInstance();
1189        effInitTS.setTime(ds.getInitInstance());
1190        effInitTS.setTimeZone(ds.getTimeZone());
1191        // To adjust EOD/EOM
1192        DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval));
1193        return effInitTS;
1194        // return ds.getInitInstance();
1195    }
1196
1197    /**
1198     * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1199     */
1200    private static Date getActionCreationtime() {
1201        ELEvaluator eval = ELEvaluator.getCurrent();
1202        return getActionCreationtime(eval);
1203    }
1204
1205    /**
1206     * @return Nominal or action creation Time when all the dependencies of an application instance are met.
1207     */
1208    private static Date getActionCreationtime(ELEvaluator eval) {
1209        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1210        if (coordAction == null) {
1211            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1212        }
1213        return coordAction.getNominalTime();
1214    }
1215
1216    /**
1217     * @return Actual Time when all the dependencies of an application instance are met.
1218     */
1219    private static Date getActualTime() {
1220        ELEvaluator eval = ELEvaluator.getCurrent();
1221        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1222        if (coordAction == null) {
1223            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1224        }
1225        return coordAction.getActualTime();
1226    }
1227
1228    /**
1229     * @return TimeZone for the application or job.
1230     */
1231    private static TimeZone getJobTZ() {
1232        ELEvaluator eval = ELEvaluator.getCurrent();
1233        SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION);
1234        if (coordAction == null) {
1235            throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION);
1236        }
1237        return coordAction.getTimeZone();
1238    }
1239
1240    /**
1241     * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1242     *
1243     * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1244     *         the dataset.
1245     */
1246    public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
1247        ELEvaluator eval = ELEvaluator.getCurrent();
1248        return getCurrentInstance(effectiveTime, instanceCount, eval);
1249    }
1250
1251    /**
1252     * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
1253     *
1254     * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
1255     *         the dataset.
1256     */
1257    private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
1258        Date datasetInitialInstance = getInitialInstance(eval);
1259        TimeUnit dsTimeUnit = getDSTimeUnit(eval);
1260        TimeZone dsTZ = getDatasetTZ(eval);
1261        int dsFreq = getDSFrequency(eval);
1262        // Convert Date to Calendar for corresponding TZ
1263        Calendar current = Calendar.getInstance();
1264        current.setTime(datasetInitialInstance);
1265        current.setTimeZone(dsTZ);
1266
1267        Calendar calEffectiveTime = Calendar.getInstance();
1268        calEffectiveTime.setTime(effectiveTime);
1269        calEffectiveTime.setTimeZone(dsTZ);
1270        if (instanceCount == null) {    // caller doesn't care about this value
1271            instanceCount = new int[1];
1272        }
1273        instanceCount[0] = 0;
1274        if (current.compareTo(calEffectiveTime) > 0) {
1275            return null;
1276        }
1277        Calendar origCurrent = (Calendar) current.clone();
1278        while (current.compareTo(calEffectiveTime) <= 0) {
1279            current = (Calendar) origCurrent.clone();
1280            instanceCount[0]++;
1281            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1282        }
1283        instanceCount[0]--;
1284
1285        current = (Calendar) origCurrent.clone();
1286        current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
1287        return current;
1288    }
1289
1290    public static Calendar getEffectiveNominalTime() {
1291        Date datasetInitialInstance = getInitialInstance();
1292        TimeZone dsTZ = getDatasetTZ();
1293        // Convert Date to Calendar for corresponding TZ
1294        Calendar current = Calendar.getInstance();
1295        current.setTime(datasetInitialInstance);
1296        current.setTimeZone(dsTZ);
1297
1298        Calendar calEffectiveTime = Calendar.getInstance();
1299        calEffectiveTime.setTime(getActionCreationtime());
1300        calEffectiveTime.setTimeZone(dsTZ);
1301        if (current.compareTo(calEffectiveTime) > 0) {
1302            // Nominal Time < initial Instance
1303            // TODO: getClass() call doesn't work from static method.
1304            // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+
1305            // current.getTime());
1306            return null;
1307        }
1308        return calEffectiveTime;
1309    }
1310
1311    /**
1312     * @return dataset frequency in minutes
1313     */
1314    private static int getDSFrequency() {
1315        ELEvaluator eval = ELEvaluator.getCurrent();
1316        return getDSFrequency(eval);
1317    }
1318
1319    /**
1320     * @return dataset frequency in minutes
1321     */
1322    private static int getDSFrequency(ELEvaluator eval) {
1323        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1324        if (ds == null) {
1325            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1326        }
1327        return ds.getFrequency();
1328    }
1329
1330    /**
1331     * @return dataset TimeUnit
1332     */
1333    private static TimeUnit getDSTimeUnit() {
1334        ELEvaluator eval = ELEvaluator.getCurrent();
1335        return getDSTimeUnit(eval);
1336    }
1337
1338    /**
1339     * @return dataset TimeUnit
1340     */
1341    private static TimeUnit getDSTimeUnit(ELEvaluator eval) {
1342        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1343        if (ds == null) {
1344            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1345        }
1346        return ds.getTimeUnit();
1347    }
1348
1349    /**
1350     * @return dataset TimeZone
1351     */
1352    public static TimeZone getDatasetTZ() {
1353        ELEvaluator eval = ELEvaluator.getCurrent();
1354        return getDatasetTZ(eval);
1355    }
1356
1357    /**
1358     * @return dataset TimeZone
1359     */
1360    private static TimeZone getDatasetTZ(ELEvaluator eval) {
1361        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1362        if (ds == null) {
1363            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1364        }
1365        return ds.getTimeZone();
1366    }
1367
1368    /**
1369     * @return dataset TimeUnit
1370     */
1371    private static TimeUnit getDSEndOfFlag() {
1372        ELEvaluator eval = ELEvaluator.getCurrent();
1373        return getDSEndOfFlag(eval);
1374    }
1375
1376    /**
1377     * @return dataset TimeUnit
1378     */
1379    private static TimeUnit getDSEndOfFlag(ELEvaluator eval) {
1380        SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
1381        if (ds == null) {
1382            throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
1383        }
1384        return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration();
1385    }
1386
1387    /**
1388     * Return a job configuration property for the coordinator.
1389     *
1390     * @param property property name.
1391     * @return the value of the property, <code>null</code> if the property is undefined.
1392     */
1393    public static String coord_conf(String property) {
1394        ELEvaluator eval = ELEvaluator.getCurrent();
1395        return (String) eval.getVariable(property);
1396    }
1397
1398    /**
1399     * Return the user that submitted the coordinator job.
1400     *
1401     * @return the user that submitted the coordinator job.
1402     */
1403    public static String coord_user() {
1404        ELEvaluator eval = ELEvaluator.getCurrent();
1405        return (String) eval.getVariable(OozieClient.USER_NAME);
1406    }
1407
1408    /**
1409     * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur
1410     * between them.  The caller should make sure that startCal is earlier than endCal.
1411     * <p>
1412     * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal
1413     * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60
1414     *
1415     * @param startCal The earlier offset time
1416     * @param endCal The later offset time
1417     * @param eval The ELEvaluator to use; cannot be null
1418     * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal
1419     */
1420    public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) {
1421        List<Integer> expandedFreqs = new ArrayList<Integer>();
1422        // Use eval because the "current" eval isn't set
1423        int freq = getDSFrequency(eval);
1424        TimeUnit freqUnit = getDSTimeUnit(eval);
1425        Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1426        int totalFreq = 0;
1427        if (startCal.before(cal)) {
1428            while (cal.after(startCal)) {
1429                cal.add(freqUnit.getCalendarUnit(), -freq);
1430                totalFreq += -freq;
1431            }
1432            if (cal.before(startCal)) {
1433                cal.add(freqUnit.getCalendarUnit(), freq);
1434                totalFreq += freq;
1435            }
1436        }
1437        else if (startCal.after(cal)) {
1438            while (cal.before(startCal)) {
1439                cal.add(freqUnit.getCalendarUnit(), freq);
1440                totalFreq += freq;
1441            }
1442        }
1443        // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the
1444        // effective nominal time.  Now we can find all of the instances that occur between startCal and endCal, inclusive.
1445        while (cal.before(endCal) || cal.equals(endCal)) {
1446            expandedFreqs.add(totalFreq);
1447            cal.add(freqUnit.getCalendarUnit(), freq);
1448            totalFreq += freq;
1449        }
1450        return expandedFreqs;
1451    }
1452
1453    /**
1454     * Resolve the offset time from the effective nominal time
1455     *
1456     * @param n offset amount (integer)
1457     * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR")
1458     * @param eval The ELEvaluator to use; or null to use the "current" eval
1459     * @return A Calendar of the offset time
1460     */
1461    public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) {
1462        // Use eval if given (for when the "current" eval isn't set)
1463        Calendar cal;
1464        if (eval == null) {
1465            cal = getCurrentInstance(getActionCreationtime(), null);
1466        }
1467        else {
1468            cal = getCurrentInstance(getActionCreationtime(eval), null, eval);
1469        }
1470        if (cal == null) {
1471            XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an"
1472                    + " empty string is returned. This means that no data is available at the offset instance specified by the user"
1473                    + " and the user could try modifying his or her initial-instance to an earlier time.");
1474            return null;
1475        }
1476        cal.add(timeUnit.getCalendarUnit(), n);
1477        return cal;
1478    }
1479}