001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.coord;
019
020import java.net.URI;
021import java.net.URISyntaxException;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.oozie.DagELFunctions;
025import org.apache.oozie.client.WorkflowJob;
026import org.apache.oozie.dependency.URIHandler;
027import org.apache.oozie.service.Services;
028import org.apache.oozie.service.URIHandlerService;
029import org.apache.oozie.util.ELEvaluator;
030import org.apache.oozie.util.HCatURI;
031import org.apache.oozie.util.XLog;
032
033/**
034 * This class implements the EL function for HCat datasets in coordinator
035 */
036
037public class HCatELFunctions {
038    private static final Configuration EMPTY_CONF = new Configuration(true);
039
040    enum EventType {
041        input, output
042    }
043
044    /* Workflow Parameterization EL functions */
045
046    /**
047     * Return true if partitions exists or false if not.
048     *
049     * @param uri hcatalog partition uri.
050     * @return <code>true</code> if the uri exists, <code>false</code> if it does not.
051     * @throws Exception
052     */
053    public static boolean hcat_exists(String uri) throws Exception {
054        URI hcatURI = new URI(uri);
055        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
056        URIHandler handler = uriService.getURIHandler(hcatURI);
057        WorkflowJob workflow = DagELFunctions.getWorkflow();
058        String user = workflow.getUser();
059        return handler.exists(hcatURI, EMPTY_CONF, user);
060    }
061
062    /* Coord EL functions */
063
064    /**
065     * Echo the same EL function without evaluating anything
066     *
067     * @param dataInName
068     * @return the same EL function
069     */
070    public static String ph1_coord_databaseIn_echo(String dataName) {
071        // Checking if the dataIn is correct?
072        isValidDataEvent(dataName);
073        return echoUnResolved("databaseIn", "'" + dataName + "'");
074    }
075
076    public static String ph1_coord_databaseOut_echo(String dataName) {
077        // Checking if the dataOut is correct?
078        isValidDataEvent(dataName);
079        return echoUnResolved("databaseOut", "'" + dataName + "'");
080    }
081
082    public static String ph1_coord_tableIn_echo(String dataName) {
083        // Checking if the dataIn is correct?
084        isValidDataEvent(dataName);
085        return echoUnResolved("tableIn", "'" + dataName + "'");
086    }
087
088    public static String ph1_coord_tableOut_echo(String dataName) {
089        // Checking if the dataOut is correct?
090        isValidDataEvent(dataName);
091        return echoUnResolved("tableOut", "'" + dataName + "'");
092    }
093
094    public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) {
095        // Checking if the dataIn/dataOut is correct?
096        isValidDataEvent(dataInName);
097        return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'");
098    }
099
100    public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) {
101        // Checking if the dataIn/dataOut is correct?
102        isValidDataEvent(dataInName);
103        return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'");
104    }
105
106    public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) {
107        // Checking if the dataIn/dataOut is correct?
108        isValidDataEvent(dataInName);
109        return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'");
110    }
111
112    public static String ph1_coord_dataOutPartitions_echo(String dataOutName) {
113        // Checking if the dataIn/dataOut is correct?
114        isValidDataEvent(dataOutName);
115        return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
116    }
117
118    public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
119        // Checking if the dataIn/dataOut is correct?
120        isValidDataEvent(dataOutName);
121        return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'");
122    }
123
124    /**
125     * Extract the hcat DB name from the URI-template associate with
126     * 'dataInName'. Caller needs to specify the EL-evaluator level variable
127     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
128     * (SyncCoordDataset)
129     *
130     * @param dataInName
131     * @return DB name
132     */
133    public static String ph3_coord_databaseIn(String dataName) {
134        HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
135        if (hcatURI != null) {
136            return hcatURI.getDb();
137        }
138        else {
139            return "";
140        }
141    }
142
143    /**
144     * Extract the hcat DB name from the URI-template associate with
145     * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
146     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
147     * (SyncCoordDataset)
148     *
149     * @param dataOutName
150     * @return DB name
151     */
152    public static String ph3_coord_databaseOut(String dataName) {
153        HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
154        if (hcatURI != null) {
155            return hcatURI.getDb();
156        }
157        else {
158            return "";
159        }
160    }
161
162    /**
163     * Extract the hcat Table name from the URI-template associate with
164     * 'dataInName'. Caller needs to specify the EL-evaluator level variable
165     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
166     * (SyncCoordDataset)
167     *
168     * @param dataInName
169     * @return Table name
170     */
171    public static String ph3_coord_tableIn(String dataName) {
172        HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
173        if (hcatURI != null) {
174            return hcatURI.getTable();
175        }
176        else {
177            return "";
178        }
179    }
180
181    /**
182     * Extract the hcat Table name from the URI-template associate with
183     * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
184     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
185     * (SyncCoordDataset)
186     *
187     * @param dataOutName
188     * @return Table name
189     */
190    public static String ph3_coord_tableOut(String dataName) {
191        HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
192        if (hcatURI != null) {
193            return hcatURI.getTable();
194        }
195        else {
196            return "";
197        }
198    }
199
200    /**
201     * Used to specify the HCat partition filter which is input dependency for workflow job.<p/> Look for two evaluator-level
202     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
203     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
204     * unresolved, this function will echo back the original function <p/> otherwise it sends the partition filter.
205     *
206     * @param dataInName : Datain name
207     * @param type : for action type - pig, MR or hive
208     */
209    public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) {
210        ELEvaluator eval = ELEvaluator.getCurrent();
211        String uris = (String) eval.getVariable(".datain." + dataInName);
212        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
213        if (unresolved != null && unresolved.booleanValue() == true) {
214            return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}";
215        }
216        return createPartitionFilter(uris, type);
217    }
218
219    /**
220     * Used to specify the HCat partition's value defining output for workflow job.<p/> Look for two evaluator-level
221     * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the current list of
222     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
223     * unresolved, this function will echo back the original function <p/> otherwise it sends the partition value.
224     *
225     * @param dataOutName : Dataout name
226     * @param partitionName : Specific partition name whose value is wanted
227     */
228    public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) {
229        ELEvaluator eval = ELEvaluator.getCurrent();
230        String uri = (String) eval.getVariable(".dataout." + dataOutName);
231        Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
232        if (unresolved != null && unresolved.booleanValue() == true) {
233            return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}";
234        }
235        try {
236            HCatURI hcatUri = new HCatURI(uri);
237            return hcatUri.getPartitionValue(partitionName);
238        }
239        catch(URISyntaxException urie) {
240            XLog.getLog(HCatELFunctions.class).warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie);
241            throw new RuntimeException("HCat URI can't be parsed " + urie);
242        }
243    }
244
245    /**
246     * Used to specify the entire HCat partition defining output for workflow job.<p/> Look for two evaluator-level
247     * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the data-out
248     * HCat URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
249     * unresolved, this function will echo back the original function <p/> otherwise it sends the partition.
250     *
251     * @param dataOutName : DataOut name
252     */
253    public static String ph3_coord_dataOutPartitions(String dataOutName) {
254        ELEvaluator eval = ELEvaluator.getCurrent();
255        String uri = (String) eval.getVariable(".dataout." + dataOutName);
256        Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
257        if (unresolved != null && unresolved.booleanValue() == true) {
258            return "${coord:dataOutPartitions('" + dataOutName + "')}";
259        }
260        try {
261            return new HCatURI(uri).toPartitionString();
262        }
263        catch (URISyntaxException e) {
264            throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
265        }
266    }
267
268    /**
269     * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
270     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
271     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
272     * unresolved, this function will echo back the original function <p/> otherwise it sends the max partition value.
273     *
274     * @param dataInName : Datain name
275     * @param partitionName : Specific partition name whose MAX value is wanted
276     */
277    public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) {
278        ELEvaluator eval = ELEvaluator.getCurrent();
279        String uris = (String) eval.getVariable(".datain." + dataInName);
280        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
281        if (unresolved != null && unresolved.booleanValue() == true) {
282            return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
283        }
284        String minPartition = null;
285        if (uris != null) {
286            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
287            // get the partition values list and find minimum
288            try {
289                // initialize minValue with first partition value
290                minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
291                if (minPartition == null || minPartition.isEmpty()) {
292                    throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
293                }
294                for (int i = 1; i < uriList.length; i++) {
295                        String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
296                        if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date
297                                                                //values can also contain letters e.g. 20120101T0300Z (UTC)
298                            minPartition = value;
299                        }
300                }
301            }
302            catch(URISyntaxException urie) {
303                throw new RuntimeException("HCat URI can't be parsed " + urie);
304            }
305        }
306        else {
307            XLog.getLog(HCatELFunctions.class).warn("URI is null");
308            return null;
309        }
310        return minPartition;
311    }
312
313    /**
314     * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
315     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
316     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
317     * unresolved, this function will echo back the original function <p/> otherwise it sends the min partition value.
318     *
319     * @param dataInName : Datain name
320     * @param partitionName : Specific partition name whose MIN value is wanted
321     */
322    public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) {
323        ELEvaluator eval = ELEvaluator.getCurrent();
324        String uris = (String) eval.getVariable(".datain." + dataInName);
325        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
326        if (unresolved != null && unresolved.booleanValue() == true) {
327            return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
328        }
329        String maxPartition = null;
330        if (uris != null) {
331            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
332            // get the partition values list and find minimum
333            try {
334                // initialize minValue with first partition value
335                maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
336                if (maxPartition == null || maxPartition.isEmpty()) {
337                    throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
338                }
339                for(int i = 1; i < uriList.length; i++) {
340                        String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
341                        if(value.compareTo(maxPartition) > 0) {
342                            maxPartition = value;
343                        }
344                }
345            }
346            catch(URISyntaxException urie) {
347                throw new RuntimeException("HCat URI can't be parsed " + urie);
348            }
349        }
350        else {
351            XLog.getLog(HCatELFunctions.class).warn("URI is null");
352            return null;
353        }
354        return maxPartition;
355    }
356
357    private static String createPartitionFilter(String uris, String type) {
358        String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
359        StringBuilder filter = new StringBuilder("");
360        if (uriList.length > 0) {
361            for (String uri : uriList) {
362                if (filter.length() > 0) {
363                    filter.append(" OR ");
364                }
365                try {
366                    filter.append(new HCatURI(uri).toPartitionFilter(type));
367                }
368                catch (URISyntaxException e) {
369                    throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
370                }
371            }
372        }
373        return filter.toString();
374    }
375
376    private static HCatURI getURIFromResolved(String dataInName, EventType type) {
377        final XLog LOG = XLog.getLog(HCatELFunctions.class);
378        StringBuilder uriTemplate = new StringBuilder();
379        ELEvaluator eval = ELEvaluator.getCurrent();
380        String uris;
381        if(type == EventType.input) {
382            uris = (String) eval.getVariable(".datain." + dataInName);
383        }
384        else { //type=output
385            uris = (String) eval.getVariable(".dataout." + dataInName);
386        }
387        if (uris != null) {
388            String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1);
389            uriTemplate.append(uri[0]);
390        }
391        else {
392            LOG.warn("URI is NULL");
393            return null;
394        }
395        LOG.info("uriTemplate [{0}] ", uriTemplate);
396        HCatURI hcatURI;
397        try {
398            hcatURI = new HCatURI(uriTemplate.toString());
399        }
400        catch (URISyntaxException e) {
401            LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e);
402            throw new RuntimeException("HCat URI can't be parsed " + e);
403        }
404        return hcatURI;
405    }
406
407    private static boolean isValidDataEvent(String dataInName) {
408        ELEvaluator eval = ELEvaluator.getCurrent();
409        String val = (String) eval.getVariable("oozie.dataname." + dataInName);
410        if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) {
411            XLog.getLog(HCatELFunctions.class).error("dataset name " + dataInName + " is not valid. val :" + val);
412            throw new RuntimeException("data set name " + dataInName + " is not valid");
413        }
414        return true;
415    }
416
417    private static String echoUnResolved(String functionName, String n) {
418        return echoUnResolvedPre(functionName, n, "coord:");
419    }
420
421    private static String echoUnResolvedPre(String functionName, String n, String prefix) {
422        ELEvaluator eval = ELEvaluator.getCurrent();
423        eval.setVariable(".wrap", "true");
424        return prefix + functionName + "(" + n + ")"; // Unresolved
425    }
426
427}