001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.coord; 019 020import java.net.URI; 021import java.util.ArrayList; 022import java.util.Calendar; 023import java.util.Date; 024import java.util.List; 025import java.util.TimeZone; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.client.OozieClient; 029import org.apache.oozie.dependency.URIHandler.Context; 030import org.apache.oozie.dependency.URIHandler; 031import org.apache.oozie.util.DateUtils; 032import org.apache.oozie.util.ELEvaluator; 033import org.apache.oozie.util.ParamChecker; 034import org.apache.oozie.util.XLog; 035import org.apache.oozie.service.Services; 036import org.apache.oozie.service.URIHandlerService; 037 038/** 039 * This class implements the EL function related to coordinator 040 */ 041 042public class CoordELFunctions { 043 final public static String DATASET = "oozie.coord.el.dataset.bean"; 044 final public static String COORD_ACTION = "oozie.coord.el.app.bean"; 045 final public static String CONFIGURATION = "oozie.coord.el.conf"; 046 final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time"; 047 // INSTANCE_SEPARATOR is used to separate multiple directories into one tag. 048 final public static String INSTANCE_SEPARATOR = "#"; 049 final public static String DIR_SEPARATOR = ","; 050 // TODO: in next release, support flexibility 051 private static String END_OF_OPERATION_INDICATOR_FILE = "_SUCCESS"; 052 053 /** 054 * Used in defining the frequency in 'day' unit. <p/> domain: <code> val > 0</code> and should be integer. 055 * 056 * @param val frequency in number of days. 057 * @return number of days and also set the frequency timeunit to "day" 058 */ 059 public static int ph1_coord_days(int val) { 060 val = ParamChecker.checkGTZero(val, "n"); 061 ELEvaluator eval = ELEvaluator.getCurrent(); 062 eval.setVariable("timeunit", TimeUnit.DAY); 063 eval.setVariable("endOfDuration", TimeUnit.NONE); 064 return val; 065 } 066 067 /** 068 * Used in defining the frequency in 'month' unit. <p/> domain: <code> val > 0</code> and should be integer. 069 * 070 * @param val frequency in number of months. 071 * @return number of months and also set the frequency timeunit to "month" 072 */ 073 public static int ph1_coord_months(int val) { 074 val = ParamChecker.checkGTZero(val, "n"); 075 ELEvaluator eval = ELEvaluator.getCurrent(); 076 eval.setVariable("timeunit", TimeUnit.MONTH); 077 eval.setVariable("endOfDuration", TimeUnit.NONE); 078 return val; 079 } 080 081 /** 082 * Used in defining the frequency in 'hour' unit. <p/> parameter value domain: <code> val > 0</code> and should 083 * be integer. 084 * 085 * @param val frequency in number of hours. 086 * @return number of minutes and also set the frequency timeunit to "minute" 087 */ 088 public static int ph1_coord_hours(int val) { 089 val = ParamChecker.checkGTZero(val, "n"); 090 ELEvaluator eval = ELEvaluator.getCurrent(); 091 eval.setVariable("timeunit", TimeUnit.MINUTE); 092 eval.setVariable("endOfDuration", TimeUnit.NONE); 093 return val * 60; 094 } 095 096 /** 097 * Used in defining the frequency in 'minute' unit. <p/> domain: <code> val > 0</code> and should be integer. 098 * 099 * @param val frequency in number of minutes. 100 * @return number of minutes and also set the frequency timeunit to "minute" 101 */ 102 public static int ph1_coord_minutes(int val) { 103 val = ParamChecker.checkGTZero(val, "n"); 104 ELEvaluator eval = ELEvaluator.getCurrent(); 105 eval.setVariable("timeunit", TimeUnit.MINUTE); 106 eval.setVariable("endOfDuration", TimeUnit.NONE); 107 return val; 108 } 109 110 /** 111 * Used in defining the frequency in 'day' unit and specify the "end of day" property. <p/> Every instance will 112 * start at 00:00 hour of each day. <p/> domain: <code> val > 0</code> and should be integer. 113 * 114 * @param val frequency in number of days. 115 * @return number of days and also set the frequency timeunit to "day" and end_of_duration flag to "day" 116 */ 117 public static int ph1_coord_endOfDays(int val) { 118 val = ParamChecker.checkGTZero(val, "n"); 119 ELEvaluator eval = ELEvaluator.getCurrent(); 120 eval.setVariable("timeunit", TimeUnit.DAY); 121 eval.setVariable("endOfDuration", TimeUnit.END_OF_DAY); 122 return val; 123 } 124 125 /** 126 * Used in defining the frequency in 'month' unit and specify the "end of month" property. <p/> Every instance will 127 * start at first day of each month at 00:00 hour. <p/> domain: <code> val > 0</code> and should be integer. 128 * 129 * @param val: frequency in number of months. 130 * @return number of months and also set the frequency timeunit to "month" and end_of_duration flag to "month" 131 */ 132 public static int ph1_coord_endOfMonths(int val) { 133 val = ParamChecker.checkGTZero(val, "n"); 134 ELEvaluator eval = ELEvaluator.getCurrent(); 135 eval.setVariable("timeunit", TimeUnit.MONTH); 136 eval.setVariable("endOfDuration", TimeUnit.END_OF_MONTH); 137 return val; 138 } 139 140 /** 141 * Calculate the difference of timezone offset in minutes between dataset and coordinator job. <p/> Depends on: <p/> 142 * 1. Timezone of both dataset and job <p/> 2. Action creation Time 143 * 144 * @return difference in minutes (DataSet TZ Offset - Application TZ offset) 145 */ 146 public static int ph2_coord_tzOffset() { 147 long actionCreationTime = getActionCreationtime().getTime(); 148 TimeZone dsTZ = ParamChecker.notNull(getDatasetTZ(), "DatasetTZ"); 149 TimeZone jobTZ = ParamChecker.notNull(getJobTZ(), "JobTZ"); 150 return (dsTZ.getOffset(actionCreationTime) - jobTZ.getOffset(actionCreationTime)) / (1000 * 60); 151 } 152 153 public static int ph3_coord_tzOffset() { 154 return ph2_coord_tzOffset(); 155 } 156 157 /** 158 * Returns the a date string while given a base date in 'strBaseDate', 159 * offset and unit (e.g. DAY, MONTH, HOUR, MINUTE, MONTH). 160 * 161 * @param strBaseDate -- base date 162 * @param offset -- any number 163 * @param unit -- DAY, MONTH, HOUR, MINUTE, MONTH 164 * @return date string 165 * @throws Exception 166 */ 167 public static String ph2_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 168 Calendar baseCalDate = DateUtils.getCalendar(strBaseDate); 169 StringBuilder buffer = new StringBuilder(); 170 baseCalDate.add(TimeUnit.valueOf(unit).getCalendarUnit(), offset); 171 buffer.append(DateUtils.formatDateOozieTZ(baseCalDate)); 172 return buffer.toString(); 173 } 174 175 public static String ph3_coord_dateOffset(String strBaseDate, int offset, String unit) throws Exception { 176 return ph2_coord_dateOffset(strBaseDate, offset, unit); 177 } 178 179 /** 180 * Determine the date-time in Oozie processing timezone of n-th future available dataset instance 181 * from nominal Time but not beyond the instance specified as 'instance. 182 * <p/> 183 * It depends on: 184 * <p/> 185 * 1. Data set frequency 186 * <p/> 187 * 2. Data set Time unit (day, month, minute) 188 * <p/> 189 * 3. Data set Time zone/DST 190 * <p/> 191 * 4. End Day/Month flag 192 * <p/> 193 * 5. Data set initial instance 194 * <p/> 195 * 6. Action Creation Time 196 * <p/> 197 * 7. Existence of dataset's directory 198 * 199 * @param n :instance count 200 * <p/> 201 * domain: n >= 0, n is integer 202 * @param instance: How many future instance it should check? value should 203 * be >=0 204 * @return date-time in Oozie processing timezone of the n-th instance 205 * <p/> 206 * @throws Exception 207 */ 208 public static String ph3_coord_future(int n, int instance) throws Exception { 209 ParamChecker.checkGEZero(n, "future:n"); 210 ParamChecker.checkGTZero(instance, "future:instance"); 211 if (isSyncDataSet()) {// For Sync Dataset 212 return coord_future_sync(n, instance); 213 } 214 else { 215 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 216 } 217 } 218 219 /** 220 * Determine the date-time in Oozie processing timezone of the future available dataset instances 221 * from start to end offsets from nominal Time but not beyond the instance specified as 'instance'. 222 * <p/> 223 * It depends on: 224 * <p/> 225 * 1. Data set frequency 226 * <p/> 227 * 2. Data set Time unit (day, month, minute) 228 * <p/> 229 * 3. Data set Time zone/DST 230 * <p/> 231 * 4. End Day/Month flag 232 * <p/> 233 * 5. Data set initial instance 234 * <p/> 235 * 6. Action Creation Time 236 * <p/> 237 * 7. Existence of dataset's directory 238 * 239 * @param start : start instance offset 240 * <p/> 241 * domain: start >= 0, start is integer 242 * @param end : end instance offset 243 * <p/> 244 * domain: end >= 0, end is integer 245 * @param instance: How many future instance it should check? value should 246 * be >=0 247 * @return date-time in Oozie processing timezone of the instances from start to end offsets 248 * delimited by comma. 249 * <p/> 250 * @throws Exception 251 */ 252 public static String ph3_coord_futureRange(int start, int end, int instance) throws Exception { 253 ParamChecker.checkGEZero(start, "future:n"); 254 ParamChecker.checkGEZero(end, "future:n"); 255 ParamChecker.checkGTZero(instance, "future:instance"); 256 if (isSyncDataSet()) {// For Sync Dataset 257 return coord_futureRange_sync(start, end, instance); 258 } 259 else { 260 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 261 } 262 } 263 264 private static String coord_future_sync(int n, int instance) throws Exception { 265 return coord_futureRange_sync(n, n, instance); 266 } 267 268 private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception { 269 final XLog LOG = XLog.getLog(CoordELFunctions.class); 270 final Thread currentThread = Thread.currentThread(); 271 ELEvaluator eval = ELEvaluator.getCurrent(); 272 String retVal = ""; 273 int datasetFrequency = (int) getDSFrequency();// in minutes 274 TimeUnit dsTimeUnit = getDSTimeUnit(); 275 int[] instCount = new int[1]; 276 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 277 StringBuilder resolvedInstances = new StringBuilder(); 278 StringBuilder resolvedURIPaths = new StringBuilder(); 279 if (nominalInstanceCal != null) { 280 Calendar initInstance = getInitialInstanceCal(); 281 nominalInstanceCal = (Calendar) initInstance.clone(); 282 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 283 284 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 285 if (ds == null) { 286 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 287 } 288 String uriTemplate = ds.getUriTemplate(); 289 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 290 if (conf == null) { 291 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 292 } 293 int available = 0, checkedInstance = 0; 294 boolean resolved = false; 295 String user = ParamChecker 296 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 297 String doneFlag = ds.getDoneFlag(); 298 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 299 URIHandler uriHandler = null; 300 Context uriContext = null; 301 try { 302 while (instance >= checkedInstance && !currentThread.isInterrupted()) { 303 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 304 String uriPath = uriEval.evaluate(uriTemplate, String.class); 305 if (uriHandler == null) { 306 URI uri = new URI(uriPath); 307 uriHandler = uriService.getURIHandler(uri); 308 uriContext = uriHandler.getContext(uri, conf, user); 309 } 310 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); 311 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { 312 if (available == endOffset) { 313 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); 314 resolved = true; 315 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 316 resolvedURIPaths.append(uriPath); 317 retVal = resolvedInstances.toString(); 318 eval.setVariable("resolved_path", resolvedURIPaths.toString()); 319 break; 320 } 321 else if (available >= startOffset) { 322 LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); 323 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( 324 INSTANCE_SEPARATOR); 325 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 326 } 327 available++; 328 } 329 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency); 330 nominalInstanceCal = (Calendar) initInstance.clone(); 331 instCount[0]++; 332 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 333 checkedInstance++; 334 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 335 } 336 } 337 finally { 338 if (uriContext != null) { 339 uriContext.destroy(); 340 } 341 } 342 if (!resolved) { 343 // return unchanged future function with variable 'is_resolved' 344 // to 'false' 345 eval.setVariable("is_resolved", Boolean.FALSE); 346 if (startOffset == endOffset) { 347 retVal = "${coord:future(" + startOffset + ", " + instance + ")}"; 348 } 349 else { 350 retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}"; 351 } 352 } 353 else { 354 eval.setVariable("is_resolved", Boolean.TRUE); 355 } 356 } 357 else {// No feasible nominal time 358 eval.setVariable("is_resolved", Boolean.TRUE); 359 retVal = ""; 360 } 361 return retVal; 362 } 363 364 /** 365 * Return nominal time or Action Creation Time. 366 * <p/> 367 * 368 * @return coordinator action creation or materialization date time 369 * @throws Exception if unable to format the Date object to String 370 */ 371 public static String ph2_coord_nominalTime() throws Exception { 372 ELEvaluator eval = ELEvaluator.getCurrent(); 373 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 374 "Coordinator Action"); 375 return DateUtils.formatDateOozieTZ(action.getNominalTime()); 376 } 377 378 public static String ph3_coord_nominalTime() throws Exception { 379 return ph2_coord_nominalTime(); 380 } 381 382 /** 383 * Convert from standard date-time formatting to a desired format. 384 * <p/> 385 * @param dateTimeStr - A timestamp in standard (ISO8601) format. 386 * @param format - A string representing the desired format. 387 * @return coordinator action creation or materialization date time 388 * @throws Exception if unable to format the Date object to String 389 */ 390 public static String ph2_coord_formatTime(String dateTimeStr, String format) 391 throws Exception { 392 Date dateTime = DateUtils.parseDateOozieTZ(dateTimeStr); 393 return DateUtils.formatDateCustom(dateTime, format); 394 } 395 396 public static String ph3_coord_formatTime(String dateTimeStr, String format) 397 throws Exception { 398 return ph2_coord_formatTime(dateTimeStr, format); 399 } 400 401 /** 402 * Return Action Id. <p/> 403 * 404 * @return coordinator action Id 405 */ 406 public static String ph2_coord_actionId() throws Exception { 407 ELEvaluator eval = ELEvaluator.getCurrent(); 408 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 409 "Coordinator Action"); 410 return action.getActionId(); 411 } 412 413 public static String ph3_coord_actionId() throws Exception { 414 return ph2_coord_actionId(); 415 } 416 417 /** 418 * Return Job Name. <p/> 419 * 420 * @return coordinator name 421 */ 422 public static String ph2_coord_name() throws Exception { 423 ELEvaluator eval = ELEvaluator.getCurrent(); 424 SyncCoordAction action = ParamChecker.notNull((SyncCoordAction) eval.getVariable(COORD_ACTION), 425 "Coordinator Action"); 426 return action.getName(); 427 } 428 429 public static String ph3_coord_name() throws Exception { 430 return ph2_coord_name(); 431 } 432 433 /** 434 * Return Action Start time. <p/> 435 * 436 * @return coordinator action start time 437 * @throws Exception if unable to format the Date object to String 438 */ 439 public static String ph2_coord_actualTime() throws Exception { 440 ELEvaluator eval = ELEvaluator.getCurrent(); 441 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 442 if (coordAction == null) { 443 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 444 } 445 return DateUtils.formatDateOozieTZ(coordAction.getActualTime()); 446 } 447 448 public static String ph3_coord_actualTime() throws Exception { 449 return ph2_coord_actualTime(); 450 } 451 452 /** 453 * Used to specify a list of URI's that are used as input dir to the workflow job. <p/> Look for two evaluator-level 454 * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of 455 * URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something 456 * unresolved, this function will echo back the original function <p/> otherwise it sends the uris. 457 * 458 * @param dataInName : Datain name 459 * @return the list of URI's separated by INSTANCE_SEPARATOR <p/> if there are unresolved EL function (i.e. latest) 460 * , echo back <p/> the function without resolving the function. 461 */ 462 public static String ph3_coord_dataIn(String dataInName) { 463 String uris = ""; 464 ELEvaluator eval = ELEvaluator.getCurrent(); 465 uris = (String) eval.getVariable(".datain." + dataInName); 466 Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); 467 if (unresolved != null && unresolved.booleanValue() == true) { 468 return "${coord:dataIn('" + dataInName + "')}"; 469 } 470 return uris; 471 } 472 473 /** 474 * Used to specify a list of URI's that are output dir of the workflow job. <p/> Look for one evaluator-level 475 * variable <p/> dataout.<DATAOUT_NAME> <p/> It defines the current list of URI. <p/> otherwise it sends the uris. 476 * 477 * @param dataOutName : Dataout name 478 * @return the list of URI's separated by INSTANCE_SEPARATOR 479 */ 480 public static String ph3_coord_dataOut(String dataOutName) { 481 String uris = ""; 482 ELEvaluator eval = ELEvaluator.getCurrent(); 483 uris = (String) eval.getVariable(".dataout." + dataOutName); 484 return uris; 485 } 486 487 /** 488 * Determine the date-time in Oozie processing timezone of n-th dataset instance. <p/> It depends on: <p/> 1. 489 * Data set frequency <p/> 2. 490 * Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. Data 491 * set initial instance <p/> 6. Action Creation Time 492 * 493 * @param n instance count domain: n is integer 494 * @return date-time in Oozie processing timezone of the n-th instance returns 'null' means n-th instance is 495 * earlier than Initial-Instance of DS 496 * @throws Exception 497 */ 498 public static String ph2_coord_current(int n) throws Exception { 499 if (isSyncDataSet()) { // For Sync Dataset 500 return coord_current_sync(n); 501 } 502 else { 503 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 504 } 505 } 506 507 /** 508 * Determine the date-time in Oozie processing timezone of current dataset instances 509 * from start to end offsets from the nominal time. <p/> It depends 510 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 511 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time 512 * 513 * @param start :start instance offset <p/> domain: start <= 0, start is integer 514 * @param end :end instance offset <p/> domain: end <= 0, end is integer 515 * @return date-time in Oozie processing timezone of the instances from start to end offsets 516 * delimited by comma. <p/> If the current instance time of the dataset based on the Action Creation Time 517 * is earlier than the Initial-Instance of DS an empty string is returned. 518 * If an instance within the range is earlier than Initial-Instance of DS that instance is ignored 519 * @throws Exception 520 */ 521 public static String ph2_coord_currentRange(int start, int end) throws Exception { 522 if (isSyncDataSet()) { // For Sync Dataset 523 return coord_currentRange_sync(start, end); 524 } 525 else { 526 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 527 } 528 } 529 530 /** 531 * Determine the date-time in Oozie processing timezone of the given offset from the dataset effective nominal time. <p/> It 532 * depends on: <p> 1. Data set frequency <p/> 2. Data set Time Unit <p/> 3. Data set Time zone/DST 533 * <p/> 4. Data set initial instance <p/> 5. Action Creation Time 534 * 535 * @param n offset amount (integer) 536 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 537 * @return date-time in Oozie processing timezone of the given offset from the dataset effective nominal time 538 * @throws Exception if there was a problem formatting 539 */ 540 public static String ph2_coord_offset(int n, String timeUnit) throws Exception { 541 if (isSyncDataSet()) { // For Sync Dataset 542 return coord_offset_sync(n, timeUnit); 543 } 544 else { 545 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 546 } 547 } 548 549 /** 550 * Determine how many hours is on the date of n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency 551 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 552 * Data set initial instance <p/> 6. Action Creation Time 553 * 554 * @param n instance count <p/> domain: n is integer 555 * @return number of hours on that day <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 556 * @throws Exception 557 */ 558 public static int ph2_coord_hoursInDay(int n) throws Exception { 559 int datasetFrequency = (int) getDSFrequency(); 560 // /Calendar nominalInstanceCal = 561 // getCurrentInstance(getActionCreationtime()); 562 Calendar nominalInstanceCal = getEffectiveNominalTime(); 563 if (nominalInstanceCal == null) { 564 return -1; 565 } 566 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 567 /* 568 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 569 * { return -1; } 570 */ 571 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 572 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 573 return DateUtils.hoursInDay(nominalInstanceCal); 574 } 575 576 public static int ph3_coord_hoursInDay(int n) throws Exception { 577 return ph2_coord_hoursInDay(n); 578 } 579 580 /** 581 * Calculate number of days in one month for n-th dataset instance. <p/> It depends on: <p/> 1. Data set frequency . 582 * <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST <p/> 4. End Day/Month flag <p/> 5. 583 * Data set initial instance <p/> 6. Action Creation Time 584 * 585 * @param n instance count. domain: n is integer 586 * @return number of days in that month <p/> returns -1 means n-th instance is earlier than Initial-Instance of DS 587 * @throws Exception 588 */ 589 public static int ph2_coord_daysInMonth(int n) throws Exception { 590 int datasetFrequency = (int) getDSFrequency();// in minutes 591 // Calendar nominalInstanceCal = 592 // getCurrentInstance(getActionCreationtime()); 593 Calendar nominalInstanceCal = getEffectiveNominalTime(); 594 if (nominalInstanceCal == null) { 595 return -1; 596 } 597 nominalInstanceCal.add(getDSTimeUnit().getCalendarUnit(), datasetFrequency * n); 598 /* 599 * if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) 600 * { return -1; } 601 */ 602 nominalInstanceCal.setTimeZone(getDatasetTZ());// Use Dataset TZ 603 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 604 return nominalInstanceCal.getActualMaximum(Calendar.DAY_OF_MONTH); 605 } 606 607 public static int ph3_coord_daysInMonth(int n) throws Exception { 608 return ph2_coord_daysInMonth(n); 609 } 610 611 /** 612 * Determine the date-time in Oozie processing timezone of n-th latest available dataset instance. <p/> It depends 613 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 614 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of 615 * dataset's directory 616 * 617 * @param n :instance count <p/> domain: n <= 0, n is integer 618 * @return date-time in Oozie processing timezone of the n-th instance <p/> returns 'null' means n-th instance is 619 * earlier than Initial-Instance of DS 620 * @throws Exception 621 */ 622 public static String ph3_coord_latest(int n) throws Exception { 623 ParamChecker.checkLEZero(n, "latest:n"); 624 if (isSyncDataSet()) {// For Sync Dataset 625 return coord_latest_sync(n); 626 } 627 else { 628 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 629 } 630 } 631 632 /** 633 * Determine the date-time in Oozie processing timezone of latest available dataset instances 634 * from start to end offsets from the nominal time. <p/> It depends 635 * on: <p/> 1. Data set frequency <p/> 2. Data set Time unit (day, month, minute) <p/> 3. Data set Time zone/DST 636 * <p/> 4. End Day/Month flag <p/> 5. Data set initial instance <p/> 6. Action Creation Time <p/> 7. Existence of 637 * dataset's directory 638 * 639 * @param start :start instance offset <p/> domain: start <= 0, start is integer 640 * @param end :end instance offset <p/> domain: end <= 0, end is integer 641 * @return date-time in Oozie processing timezone of the instances from start to end offsets 642 * delimited by comma. <p/> returns 'null' means start offset instance is 643 * earlier than Initial-Instance of DS 644 * @throws Exception 645 */ 646 public static String ph3_coord_latestRange(int start, int end) throws Exception { 647 ParamChecker.checkLEZero(start, "latest:n"); 648 ParamChecker.checkLEZero(end, "latest:n"); 649 if (isSyncDataSet()) {// For Sync Dataset 650 return coord_latestRange_sync(start, end); 651 } 652 else { 653 throw new UnsupportedOperationException("Asynchronous Dataset is not supported yet"); 654 } 655 } 656 657 /** 658 * Configure an evaluator with data set and application specific information. <p/> Helper method of associating 659 * dataset and application object 660 * 661 * @param evaluator : to set variables 662 * @param ds : Data Set object 663 * @param coordAction : Application instance 664 */ 665 public static void configureEvaluator(ELEvaluator evaluator, SyncCoordDataset ds, SyncCoordAction coordAction) { 666 evaluator.setVariable(COORD_ACTION, coordAction); 667 evaluator.setVariable(DATASET, ds); 668 } 669 670 /** 671 * Helper method to wrap around with "${..}". <p/> 672 * 673 * 674 * @param eval :EL evaluator 675 * @param expr : expression to evaluate 676 * @return Resolved expression or echo back the same expression 677 * @throws Exception 678 */ 679 public static String evalAndWrap(ELEvaluator eval, String expr) throws Exception { 680 try { 681 eval.setVariable(".wrap", null); 682 String result = eval.evaluate(expr, String.class); 683 if (eval.getVariable(".wrap") != null) { 684 return "${" + result + "}"; 685 } 686 else { 687 return result; 688 } 689 } 690 catch (Exception e) { 691 throw new Exception("Unable to evaluate :" + expr + ":\n", e); 692 } 693 } 694 695 // Set of echo functions 696 697 public static String ph1_coord_current_echo(String n) { 698 return echoUnResolved("current", n); 699 } 700 701 public static String ph1_coord_currentRange_echo(String start, String end) { 702 return echoUnResolved("currentRange", start + ", " + end); 703 } 704 705 public static String ph1_coord_offset_echo(String n, String timeUnit) { 706 return echoUnResolved("offset", n + " , " + timeUnit); 707 } 708 709 public static String ph2_coord_current_echo(String n) { 710 return echoUnResolved("current", n); 711 } 712 713 public static String ph2_coord_currentRange_echo(String start, String end) { 714 return echoUnResolved("currentRange", start + ", " + end); 715 } 716 717 public static String ph2_coord_offset_echo(String n, String timeUnit) { 718 return echoUnResolved("offset", n + " , " + timeUnit); 719 } 720 721 public static String ph1_coord_dateOffset_echo(String n, String offset, String unit) { 722 return echoUnResolved("dateOffset", n + " , " + offset + " , " + unit); 723 } 724 725 public static String ph1_coord_formatTime_echo(String dateTime, String format) { 726 // Quote the dateTime value since it would contain a ':'. 727 return echoUnResolved("formatTime", "'"+dateTime+"'" + " , " + format); 728 } 729 730 public static String ph1_coord_latest_echo(String n) { 731 return echoUnResolved("latest", n); 732 } 733 734 public static String ph2_coord_latest_echo(String n) { 735 return ph1_coord_latest_echo(n); 736 } 737 738 public static String ph1_coord_future_echo(String n, String instance) { 739 return echoUnResolved("future", n + ", " + instance + ""); 740 } 741 742 public static String ph2_coord_future_echo(String n, String instance) { 743 return ph1_coord_future_echo(n, instance); 744 } 745 746 public static String ph1_coord_latestRange_echo(String start, String end) { 747 return echoUnResolved("latestRange", start + ", " + end); 748 } 749 750 public static String ph2_coord_latestRange_echo(String start, String end) { 751 return ph1_coord_latestRange_echo(start, end); 752 } 753 754 public static String ph1_coord_futureRange_echo(String start, String end, String instance) { 755 return echoUnResolved("futureRange", start + ", " + end + ", " + instance); 756 } 757 758 public static String ph2_coord_futureRange_echo(String start, String end, String instance) { 759 return ph1_coord_futureRange_echo(start, end, instance); 760 } 761 762 public static String ph1_coord_dataIn_echo(String n) { 763 ELEvaluator eval = ELEvaluator.getCurrent(); 764 String val = (String) eval.getVariable("oozie.dataname." + n); 765 if (val == null || val.equals("data-in") == false) { 766 XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid"); 767 throw new RuntimeException("data_in_name " + n + " is not valid"); 768 } 769 return echoUnResolved("dataIn", "'" + n + "'"); 770 } 771 772 public static String ph1_coord_dataOut_echo(String n) { 773 ELEvaluator eval = ELEvaluator.getCurrent(); 774 String val = (String) eval.getVariable("oozie.dataname." + n); 775 if (val == null || val.equals("data-out") == false) { 776 XLog.getLog(CoordELFunctions.class).error("data_out_name " + n + " is not valid"); 777 throw new RuntimeException("data_out_name " + n + " is not valid"); 778 } 779 return echoUnResolved("dataOut", "'" + n + "'"); 780 } 781 782 public static String ph1_coord_nominalTime_echo() { 783 return echoUnResolved("nominalTime", ""); 784 } 785 786 public static String ph1_coord_nominalTime_echo_wrap() { 787 // return "${coord:nominalTime()}"; // no resolution 788 return echoUnResolved("nominalTime", ""); 789 } 790 791 public static String ph1_coord_nominalTime_echo_fixed() { 792 return "2009-03-06T010:00"; // Dummy resolution 793 } 794 795 public static String ph1_coord_actualTime_echo_wrap() { 796 // return "${coord:actualTime()}"; // no resolution 797 return echoUnResolved("actualTime", ""); 798 } 799 800 public static String ph1_coord_actionId_echo() { 801 return echoUnResolved("actionId", ""); 802 } 803 804 public static String ph1_coord_name_echo() { 805 return echoUnResolved("name", ""); 806 } 807 808 // The following echo functions are not used in any phases yet 809 // They are here for future purpose. 810 public static String coord_minutes_echo(String n) { 811 return echoUnResolved("minutes", n); 812 } 813 814 public static String coord_hours_echo(String n) { 815 return echoUnResolved("hours", n); 816 } 817 818 public static String coord_days_echo(String n) { 819 return echoUnResolved("days", n); 820 } 821 822 public static String coord_endOfDay_echo(String n) { 823 return echoUnResolved("endOfDay", n); 824 } 825 826 public static String coord_months_echo(String n) { 827 return echoUnResolved("months", n); 828 } 829 830 public static String coord_endOfMonth_echo(String n) { 831 return echoUnResolved("endOfMonth", n); 832 } 833 834 public static String coord_actualTime_echo() { 835 return echoUnResolved("actualTime", ""); 836 } 837 838 // This echo function will always return "24" for validation only. 839 // This evaluation ****should not**** replace the original XML 840 // Create a temporary string and validate the function 841 // This is **required** for evaluating an expression like 842 // coord:HoursInDay(0) + 3 843 // actual evaluation will happen in phase 2 or phase 3. 844 public static String ph1_coord_hoursInDay_echo(String n) { 845 return "24"; 846 // return echoUnResolved("hoursInDay", n); 847 } 848 849 // This echo function will always return "30" for validation only. 850 // This evaluation ****should not**** replace the original XML 851 // Create a temporary string and validate the function 852 // This is **required** for evaluating an expression like 853 // coord:daysInMonth(0) + 3 854 // actual evaluation will happen in phase 2 or phase 3. 855 public static String ph1_coord_daysInMonth_echo(String n) { 856 // return echoUnResolved("daysInMonth", n); 857 return "30"; 858 } 859 860 // This echo function will always return "3" for validation only. 861 // This evaluation ****should not**** replace the original XML 862 // Create a temporary string and validate the function 863 // This is **required** for evaluating an expression like coord:tzOffset + 2 864 // actual evaluation will happen in phase 2 or phase 3. 865 public static String ph1_coord_tzOffset_echo() { 866 // return echoUnResolved("tzOffset", ""); 867 return "3"; 868 } 869 870 // Local methods 871 /** 872 * @param n 873 * @return n-th instance Date-Time from current instance for data-set <p/> return empty string ("") if the 874 * Action_Creation_time or the n-th instance <p/> is earlier than the Initial_Instance of dataset. 875 * @throws Exception 876 */ 877 private static String coord_current_sync(int n) throws Exception { 878 return coord_currentRange_sync(n, n); 879 } 880 881 private static String coord_currentRange_sync(int start, int end) throws Exception { 882 final XLog LOG = XLog.getLog(CoordELFunctions.class); 883 int datasetFrequency = getDSFrequency();// in minutes 884 TimeUnit dsTimeUnit = getDSTimeUnit(); 885 int[] instCount = new int[1];// used as pass by ref 886 Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); 887 StringBuilder instanceList = new StringBuilder(); 888 if (nominalInstanceCal == null) { 889 LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is" 890 + " returned. This means that no data is available at the current-instance specified by the user" 891 + " and the user could try modifying his initial-instance to an earlier time."); 892 return ""; 893 } else { 894 Calendar initInstance = getInitialInstanceCal(); 895 instCount[0] = instCount[0] + end; 896 // Add in the reverse order - newest instance first. 897 for (int i = end; i >= start; i--) { 898 // Tried to avoid the clone. But subtracting datasetFrequency gives different results than multiplying 899 // and Spring DST transition test in TestCoordELfunctions.testCurrent() fails 900 //nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency); 901 nominalInstanceCal = (Calendar) initInstance.clone(); 902 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 903 instCount[0]--; 904 if (nominalInstanceCal.compareTo(initInstance) < 0) { 905 LOG.warn("If the initial instance of the dataset is later than the current-instance specified," 906 + " such as coord:current({0}) in this case, an empty string is returned. This means that" 907 + " no data is available at the current-instance specified by the user and the user could" 908 + " try modifying his initial-instance to an earlier time.", start); 909 break; 910 } 911 else { 912 instanceList.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 913 instanceList.append(CoordELFunctions.INSTANCE_SEPARATOR); 914 } 915 } 916 } 917 918 if (instanceList.length() > 0) { 919 instanceList.setLength(instanceList.length() - CoordELFunctions.INSTANCE_SEPARATOR.length()); 920 } 921 return instanceList.toString(); 922 } 923 924 /** 925 * 926 * @param n offset amount (integer) 927 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 928 * @return the offset time from the effective nominal time <p/> return empty string ("") if the Action_Creation_time or the 929 * offset instance <p/> is earlier than the Initial_Instance of dataset. 930 * @throws Exception 931 */ 932 private static String coord_offset_sync(int n, String timeUnit) throws Exception { 933 Calendar rawCal = resolveOffsetRawTime(n, TimeUnit.valueOf(timeUnit), null); 934 if (rawCal == null) { 935 // warning already logged by resolveOffsetRawTime() 936 return ""; 937 } 938 939 int freq = getDSFrequency(); 940 TimeUnit freqUnit = getDSTimeUnit(); 941 int freqCount = 0; 942 // We're going to manually turn back/forward cal by decrements/increments of freq and then check that it gives the same 943 // time as rawCal; this is to check that the offset time resolves to a frequency offset of the effective nominal time 944 // In other words, that there exists an integer x, such that coord:offset(n, timeUnit) == coord:current(x) is true 945 // If not, then we'll "rewind" rawCal to the latest instance earlier than rawCal and use that. 946 Calendar cal = getInitialInstanceCal(); 947 if (rawCal.before(cal)) { 948 while (cal.after(rawCal)) { 949 cal.add(freqUnit.getCalendarUnit(), -freq); 950 freqCount--; 951 } 952 } 953 else if (rawCal.after(cal)) { 954 while (cal.before(rawCal)) { 955 cal.add(freqUnit.getCalendarUnit(), freq); 956 freqCount++; 957 } 958 } 959 if (cal.before(rawCal)) { 960 rawCal = cal; 961 } 962 else if (cal.after(rawCal)) { 963 cal.add(freqUnit.getCalendarUnit(), -freq); 964 rawCal = cal; 965 freqCount--; 966 } 967 String rawCalStr = DateUtils.formatDateOozieTZ(rawCal); 968 969 Calendar nominalInstanceCal = getInitialInstanceCal(); 970 nominalInstanceCal.add(freqUnit.getCalendarUnit(), freq * freqCount); 971 if (nominalInstanceCal.getTime().compareTo(getInitialInstance()) < 0) { 972 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the offset instance" 973 + " specified, such as coord:offset({0}, {1}) in this case, an empty string is returned. This means that no" 974 + " data is available at the offset instance specified by the user and the user could try modifying his" 975 + " initial-instance to an earlier time.", n, timeUnit); 976 return ""; 977 } 978 String nominalCalStr = DateUtils.formatDateOozieTZ(nominalInstanceCal); 979 980 if (!rawCalStr.equals(nominalCalStr)) { 981 throw new RuntimeException("Shouldn't happen"); 982 } 983 return rawCalStr; 984 } 985 986 /** 987 * @param offset 988 * @return n-th available latest instance Date-Time for SYNC data-set 989 * @throws Exception 990 */ 991 private static String coord_latest_sync(int offset) throws Exception { 992 return coord_latestRange_sync(offset, offset); 993 } 994 995 private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception { 996 final XLog LOG = XLog.getLog(CoordELFunctions.class); 997 final Thread currentThread = Thread.currentThread(); 998 ELEvaluator eval = ELEvaluator.getCurrent(); 999 String retVal = ""; 1000 int datasetFrequency = (int) getDSFrequency();// in minutes 1001 TimeUnit dsTimeUnit = getDSTimeUnit(); 1002 int[] instCount = new int[1]; 1003 boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false); 1004 Calendar nominalInstanceCal; 1005 if (useCurrentTime) { 1006 nominalInstanceCal = getCurrentInstance(new Date(), instCount); 1007 } 1008 else { 1009 nominalInstanceCal = getCurrentInstance(getActualTime(), instCount); 1010 } 1011 StringBuilder resolvedInstances = new StringBuilder(); 1012 StringBuilder resolvedURIPaths = new StringBuilder(); 1013 if (nominalInstanceCal != null) { 1014 Calendar initInstance = getInitialInstanceCal(); 1015 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1016 if (ds == null) { 1017 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1018 } 1019 String uriTemplate = ds.getUriTemplate(); 1020 Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); 1021 if (conf == null) { 1022 throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); 1023 } 1024 int available = 0; 1025 boolean resolved = false; 1026 String user = ParamChecker 1027 .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); 1028 String doneFlag = ds.getDoneFlag(); 1029 URIHandlerService uriService = Services.get().get(URIHandlerService.class); 1030 URIHandler uriHandler = null; 1031 Context uriContext = null; 1032 try { 1033 while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) { 1034 ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); 1035 String uriPath = uriEval.evaluate(uriTemplate, String.class); 1036 if (uriHandler == null) { 1037 URI uri = new URI(uriPath); 1038 uriHandler = uriService.getURIHandler(uri); 1039 uriContext = uriHandler.getContext(uri, conf, user); 1040 } 1041 String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); 1042 if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { 1043 XLog.getLog(CoordELFunctions.class) 1044 .debug("Found latest(" + available + "): " + uriWithDoneFlag); 1045 if (available == startOffset) { 1046 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); 1047 resolved = true; 1048 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); 1049 resolvedURIPaths.append(uriPath); 1050 retVal = resolvedInstances.toString(); 1051 eval.setVariable("resolved_path", resolvedURIPaths.toString()); 1052 break; 1053 } 1054 else if (available <= endOffset) { 1055 LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); 1056 resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( 1057 INSTANCE_SEPARATOR); 1058 resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); 1059 } 1060 1061 available--; 1062 } 1063 // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency); 1064 nominalInstanceCal = (Calendar) initInstance.clone(); 1065 instCount[0]--; 1066 nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); 1067 // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); 1068 } 1069 } 1070 finally { 1071 if (uriContext != null) { 1072 uriContext.destroy(); 1073 } 1074 } 1075 if (!resolved) { 1076 // return unchanged latest function with variable 'is_resolved' 1077 // to 'false' 1078 eval.setVariable("is_resolved", Boolean.FALSE); 1079 if (startOffset == endOffset) { 1080 retVal = "${coord:latest(" + startOffset + ")}"; 1081 } 1082 else { 1083 retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}"; 1084 } 1085 } 1086 else { 1087 eval.setVariable("is_resolved", Boolean.TRUE); 1088 } 1089 } 1090 else {// No feasible nominal time 1091 eval.setVariable("is_resolved", Boolean.FALSE); 1092 } 1093 return retVal; 1094 } 1095 1096 /** 1097 * @param tm 1098 * @return a new Evaluator to be used for URI-template evaluation 1099 */ 1100 private static ELEvaluator getUriEvaluator(Calendar tm) { 1101 tm.setTimeZone(DateUtils.getOozieProcessingTimeZone()); 1102 ELEvaluator retEval = new ELEvaluator(); 1103 retEval.setVariable("YEAR", tm.get(Calendar.YEAR)); 1104 retEval.setVariable("MONTH", (tm.get(Calendar.MONTH) + 1) < 10 ? "0" + (tm.get(Calendar.MONTH) + 1) : (tm 1105 .get(Calendar.MONTH) + 1)); 1106 retEval.setVariable("DAY", tm.get(Calendar.DAY_OF_MONTH) < 10 ? "0" + tm.get(Calendar.DAY_OF_MONTH) : tm 1107 .get(Calendar.DAY_OF_MONTH)); 1108 retEval.setVariable("HOUR", tm.get(Calendar.HOUR_OF_DAY) < 10 ? "0" + tm.get(Calendar.HOUR_OF_DAY) : tm 1109 .get(Calendar.HOUR_OF_DAY)); 1110 retEval.setVariable("MINUTE", tm.get(Calendar.MINUTE) < 10 ? "0" + tm.get(Calendar.MINUTE) : tm 1111 .get(Calendar.MINUTE)); 1112 return retEval; 1113 } 1114 1115 /** 1116 * @return whether a data set is SYNCH or ASYNC 1117 */ 1118 private static boolean isSyncDataSet() { 1119 ELEvaluator eval = ELEvaluator.getCurrent(); 1120 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1121 if (ds == null) { 1122 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1123 } 1124 return ds.getType().equalsIgnoreCase("SYNC"); 1125 } 1126 1127 /** 1128 * Check whether a function should be resolved. 1129 * 1130 * @param functionName 1131 * @param n 1132 * @return null if the functionName needs to be resolved otherwise return the calling function unresolved. 1133 */ 1134 private static String checkIfResolved(String functionName, String n) { 1135 ELEvaluator eval = ELEvaluator.getCurrent(); 1136 String replace = (String) eval.getVariable("resolve_" + functionName); 1137 if (replace == null || (replace != null && replace.equalsIgnoreCase("false"))) { // Don't 1138 // resolve 1139 // return "${coord:" + functionName + "(" + n +")}"; //Unresolved 1140 eval.setVariable(".wrap", "true"); 1141 return "coord:" + functionName + "(" + n + ")"; // Unresolved 1142 } 1143 return null; // Resolved it 1144 } 1145 1146 private static String echoUnResolved(String functionName, String n) { 1147 return echoUnResolvedPre(functionName, n, "coord:"); 1148 } 1149 1150 private static String echoUnResolvedPre(String functionName, String n, String prefix) { 1151 ELEvaluator eval = ELEvaluator.getCurrent(); 1152 eval.setVariable(".wrap", "true"); 1153 return prefix + functionName + "(" + n + ")"; // Unresolved 1154 } 1155 1156 /** 1157 * @return the initial instance of a DataSet in DATE 1158 */ 1159 private static Date getInitialInstance() { 1160 ELEvaluator eval = ELEvaluator.getCurrent(); 1161 return getInitialInstance(eval); 1162 } 1163 1164 /** 1165 * @return the initial instance of a DataSet in DATE 1166 */ 1167 private static Date getInitialInstance(ELEvaluator eval) { 1168 return getInitialInstanceCal(eval).getTime(); 1169 // return ds.getInitInstance(); 1170 } 1171 1172 /** 1173 * @return the initial instance of a DataSet in Calendar 1174 */ 1175 private static Calendar getInitialInstanceCal() { 1176 ELEvaluator eval = ELEvaluator.getCurrent(); 1177 return getInitialInstanceCal(eval); 1178 } 1179 1180 /** 1181 * @return the initial instance of a DataSet in Calendar 1182 */ 1183 private static Calendar getInitialInstanceCal(ELEvaluator eval) { 1184 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1185 if (ds == null) { 1186 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1187 } 1188 Calendar effInitTS = Calendar.getInstance(); 1189 effInitTS.setTime(ds.getInitInstance()); 1190 effInitTS.setTimeZone(ds.getTimeZone()); 1191 // To adjust EOD/EOM 1192 DateUtils.moveToEnd(effInitTS, getDSEndOfFlag(eval)); 1193 return effInitTS; 1194 // return ds.getInitInstance(); 1195 } 1196 1197 /** 1198 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1199 */ 1200 private static Date getActionCreationtime() { 1201 ELEvaluator eval = ELEvaluator.getCurrent(); 1202 return getActionCreationtime(eval); 1203 } 1204 1205 /** 1206 * @return Nominal or action creation Time when all the dependencies of an application instance are met. 1207 */ 1208 private static Date getActionCreationtime(ELEvaluator eval) { 1209 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1210 if (coordAction == null) { 1211 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1212 } 1213 return coordAction.getNominalTime(); 1214 } 1215 1216 /** 1217 * @return Actual Time when all the dependencies of an application instance are met. 1218 */ 1219 private static Date getActualTime() { 1220 ELEvaluator eval = ELEvaluator.getCurrent(); 1221 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1222 if (coordAction == null) { 1223 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1224 } 1225 return coordAction.getActualTime(); 1226 } 1227 1228 /** 1229 * @return TimeZone for the application or job. 1230 */ 1231 private static TimeZone getJobTZ() { 1232 ELEvaluator eval = ELEvaluator.getCurrent(); 1233 SyncCoordAction coordAction = (SyncCoordAction) eval.getVariable(COORD_ACTION); 1234 if (coordAction == null) { 1235 throw new RuntimeException("Associated Application instance should be defined with key " + COORD_ACTION); 1236 } 1237 return coordAction.getTimeZone(); 1238 } 1239 1240 /** 1241 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1242 * 1243 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1244 * the dataset. 1245 */ 1246 public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) { 1247 ELEvaluator eval = ELEvaluator.getCurrent(); 1248 return getCurrentInstance(effectiveTime, instanceCount, eval); 1249 } 1250 1251 /** 1252 * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) 1253 * 1254 * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of 1255 * the dataset. 1256 */ 1257 private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) { 1258 Date datasetInitialInstance = getInitialInstance(eval); 1259 TimeUnit dsTimeUnit = getDSTimeUnit(eval); 1260 TimeZone dsTZ = getDatasetTZ(eval); 1261 int dsFreq = getDSFrequency(eval); 1262 // Convert Date to Calendar for corresponding TZ 1263 Calendar current = Calendar.getInstance(); 1264 current.setTime(datasetInitialInstance); 1265 current.setTimeZone(dsTZ); 1266 1267 Calendar calEffectiveTime = Calendar.getInstance(); 1268 calEffectiveTime.setTime(effectiveTime); 1269 calEffectiveTime.setTimeZone(dsTZ); 1270 if (instanceCount == null) { // caller doesn't care about this value 1271 instanceCount = new int[1]; 1272 } 1273 instanceCount[0] = 0; 1274 if (current.compareTo(calEffectiveTime) > 0) { 1275 return null; 1276 } 1277 Calendar origCurrent = (Calendar) current.clone(); 1278 while (current.compareTo(calEffectiveTime) <= 0) { 1279 current = (Calendar) origCurrent.clone(); 1280 instanceCount[0]++; 1281 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1282 } 1283 instanceCount[0]--; 1284 1285 current = (Calendar) origCurrent.clone(); 1286 current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); 1287 return current; 1288 } 1289 1290 public static Calendar getEffectiveNominalTime() { 1291 Date datasetInitialInstance = getInitialInstance(); 1292 TimeZone dsTZ = getDatasetTZ(); 1293 // Convert Date to Calendar for corresponding TZ 1294 Calendar current = Calendar.getInstance(); 1295 current.setTime(datasetInitialInstance); 1296 current.setTimeZone(dsTZ); 1297 1298 Calendar calEffectiveTime = Calendar.getInstance(); 1299 calEffectiveTime.setTime(getActionCreationtime()); 1300 calEffectiveTime.setTimeZone(dsTZ); 1301 if (current.compareTo(calEffectiveTime) > 0) { 1302 // Nominal Time < initial Instance 1303 // TODO: getClass() call doesn't work from static method. 1304 // XLog.getLog("CoordELFunction.class").warn("ACTION CREATED BEFORE INITIAL INSTACE "+ 1305 // current.getTime()); 1306 return null; 1307 } 1308 return calEffectiveTime; 1309 } 1310 1311 /** 1312 * @return dataset frequency in minutes 1313 */ 1314 private static int getDSFrequency() { 1315 ELEvaluator eval = ELEvaluator.getCurrent(); 1316 return getDSFrequency(eval); 1317 } 1318 1319 /** 1320 * @return dataset frequency in minutes 1321 */ 1322 private static int getDSFrequency(ELEvaluator eval) { 1323 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1324 if (ds == null) { 1325 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1326 } 1327 return ds.getFrequency(); 1328 } 1329 1330 /** 1331 * @return dataset TimeUnit 1332 */ 1333 private static TimeUnit getDSTimeUnit() { 1334 ELEvaluator eval = ELEvaluator.getCurrent(); 1335 return getDSTimeUnit(eval); 1336 } 1337 1338 /** 1339 * @return dataset TimeUnit 1340 */ 1341 private static TimeUnit getDSTimeUnit(ELEvaluator eval) { 1342 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1343 if (ds == null) { 1344 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1345 } 1346 return ds.getTimeUnit(); 1347 } 1348 1349 /** 1350 * @return dataset TimeZone 1351 */ 1352 public static TimeZone getDatasetTZ() { 1353 ELEvaluator eval = ELEvaluator.getCurrent(); 1354 return getDatasetTZ(eval); 1355 } 1356 1357 /** 1358 * @return dataset TimeZone 1359 */ 1360 private static TimeZone getDatasetTZ(ELEvaluator eval) { 1361 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1362 if (ds == null) { 1363 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1364 } 1365 return ds.getTimeZone(); 1366 } 1367 1368 /** 1369 * @return dataset TimeUnit 1370 */ 1371 private static TimeUnit getDSEndOfFlag() { 1372 ELEvaluator eval = ELEvaluator.getCurrent(); 1373 return getDSEndOfFlag(eval); 1374 } 1375 1376 /** 1377 * @return dataset TimeUnit 1378 */ 1379 private static TimeUnit getDSEndOfFlag(ELEvaluator eval) { 1380 SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); 1381 if (ds == null) { 1382 throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); 1383 } 1384 return ds.getEndOfDuration();// == null ? "": ds.getEndOfDuration(); 1385 } 1386 1387 /** 1388 * Return a job configuration property for the coordinator. 1389 * 1390 * @param property property name. 1391 * @return the value of the property, <code>null</code> if the property is undefined. 1392 */ 1393 public static String coord_conf(String property) { 1394 ELEvaluator eval = ELEvaluator.getCurrent(); 1395 return (String) eval.getVariable(property); 1396 } 1397 1398 /** 1399 * Return the user that submitted the coordinator job. 1400 * 1401 * @return the user that submitted the coordinator job. 1402 */ 1403 public static String coord_user() { 1404 ELEvaluator eval = ELEvaluator.getCurrent(); 1405 return (String) eval.getVariable(OozieClient.USER_NAME); 1406 } 1407 1408 /** 1409 * Takes two offset times and returns a list of multiples of the frequency offset from the effective nominal time that occur 1410 * between them. The caller should make sure that startCal is earlier than endCal. 1411 * <p> 1412 * As a simple example, assume its the same day: startCal is 1:00, endCal is 2:00, frequency is 20min, and effective nominal 1413 * time is 1:20 -- then this method would return a list containing: -20, 0, 20, 40, 60 1414 * 1415 * @param startCal The earlier offset time 1416 * @param endCal The later offset time 1417 * @param eval The ELEvaluator to use; cannot be null 1418 * @return A list of multiple of the frequency offset from the effective nominal time that occur between the startCal and endCal 1419 */ 1420 public static List<Integer> expandOffsetTimes(Calendar startCal, Calendar endCal, ELEvaluator eval) { 1421 List<Integer> expandedFreqs = new ArrayList<Integer>(); 1422 // Use eval because the "current" eval isn't set 1423 int freq = getDSFrequency(eval); 1424 TimeUnit freqUnit = getDSTimeUnit(eval); 1425 Calendar cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1426 int totalFreq = 0; 1427 if (startCal.before(cal)) { 1428 while (cal.after(startCal)) { 1429 cal.add(freqUnit.getCalendarUnit(), -freq); 1430 totalFreq += -freq; 1431 } 1432 if (cal.before(startCal)) { 1433 cal.add(freqUnit.getCalendarUnit(), freq); 1434 totalFreq += freq; 1435 } 1436 } 1437 else if (startCal.after(cal)) { 1438 while (cal.before(startCal)) { 1439 cal.add(freqUnit.getCalendarUnit(), freq); 1440 totalFreq += freq; 1441 } 1442 } 1443 // At this point, cal is the smallest multiple of the dataset frequency that is >= to the startCal and offset from the 1444 // effective nominal time. Now we can find all of the instances that occur between startCal and endCal, inclusive. 1445 while (cal.before(endCal) || cal.equals(endCal)) { 1446 expandedFreqs.add(totalFreq); 1447 cal.add(freqUnit.getCalendarUnit(), freq); 1448 totalFreq += freq; 1449 } 1450 return expandedFreqs; 1451 } 1452 1453 /** 1454 * Resolve the offset time from the effective nominal time 1455 * 1456 * @param n offset amount (integer) 1457 * @param timeUnit TimeUnit for offset n ("MINUTE", "HOUR", "DAY", "MONTH", "YEAR") 1458 * @param eval The ELEvaluator to use; or null to use the "current" eval 1459 * @return A Calendar of the offset time 1460 */ 1461 public static Calendar resolveOffsetRawTime(int n, TimeUnit timeUnit, ELEvaluator eval) { 1462 // Use eval if given (for when the "current" eval isn't set) 1463 Calendar cal; 1464 if (eval == null) { 1465 cal = getCurrentInstance(getActionCreationtime(), null); 1466 } 1467 else { 1468 cal = getCurrentInstance(getActionCreationtime(eval), null, eval); 1469 } 1470 if (cal == null) { 1471 XLog.getLog(CoordELFunctions.class).warn("If the initial instance of the dataset is later than the nominal time, an" 1472 + " empty string is returned. This means that no data is available at the offset instance specified by the user" 1473 + " and the user could try modifying his or her initial-instance to an earlier time."); 1474 return null; 1475 } 1476 cal.add(timeUnit.getCalendarUnit(), n); 1477 return cal; 1478 } 1479}