001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie;
019
020import java.io.IOException;
021import java.io.Writer;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.LinkedHashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.StringTokenizer;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.oozie.client.CoordinatorAction;
035import org.apache.oozie.client.CoordinatorJob;
036import org.apache.oozie.client.OozieClient;
037import org.apache.oozie.client.WorkflowJob;
038import org.apache.oozie.client.rest.RestConstants;
039import org.apache.oozie.command.CommandException;
040import org.apache.oozie.command.coord.CoordActionInfoXCommand;
041import org.apache.oozie.util.CoordActionsInDateRange;
042import org.apache.oozie.command.coord.CoordChangeXCommand;
043import org.apache.oozie.command.coord.CoordJobXCommand;
044import org.apache.oozie.command.coord.CoordJobsXCommand;
045import org.apache.oozie.command.coord.CoordKillXCommand;
046import org.apache.oozie.command.coord.CoordRerunXCommand;
047import org.apache.oozie.command.coord.CoordResumeXCommand;
048import org.apache.oozie.command.coord.CoordSubmitXCommand;
049import org.apache.oozie.command.coord.CoordSuspendXCommand;
050import org.apache.oozie.service.DagXLogInfoService;
051import org.apache.oozie.service.Services;
052import org.apache.oozie.service.XLogService;
053import org.apache.oozie.util.ParamChecker;
054import org.apache.oozie.util.XLog;
055import org.apache.oozie.util.XLogStreamer;
056
057import com.google.common.annotations.VisibleForTesting;
058
059public class CoordinatorEngine extends BaseEngine {
060    private static XLog LOG = XLog.getLog(CoordinatorEngine.class);
061
062    /**
063     * Create a system Coordinator engine, with no user and no group.
064     */
065    public CoordinatorEngine() {
066        if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
067            LOG.debug("Oozie CoordinatorEngine is not using XCommands.");
068        }
069        else {
070            LOG.debug("Oozie CoordinatorEngine is using XCommands.");
071        }
072    }
073
074    /**
075     * Create a Coordinator engine to perform operations on behave of a user.
076     *
077     * @param user user name.
078     */
079    public CoordinatorEngine(String user) {
080        this();
081        this.user = ParamChecker.notEmpty(user, "user");
082    }
083
084    /*
085     * (non-Javadoc)
086     *
087     * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
088     */
089    @Override
090    public String getDefinition(String jobId) throws BaseEngineException {
091        CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
092        return job.getOrigJobXml();
093    }
094
095    /**
096     * @param jobId
097     * @return CoordinatorJobBean
098     * @throws BaseEngineException
099     */
100    private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
101        try {
102            return new CoordJobXCommand(jobId).call();
103        }
104        catch (CommandException ex) {
105            throw new BaseEngineException(ex);
106        }
107    }
108
109    /**
110     * @param actionId
111     * @return CoordinatorActionBean
112     * @throws BaseEngineException
113     */
114    public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
115        try {
116            return new CoordActionInfoXCommand(actionId).call();
117        }
118        catch (CommandException ex) {
119            throw new BaseEngineException(ex);
120        }
121    }
122
123    /*
124     * (non-Javadoc)
125     *
126     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
127     */
128    @Override
129    public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
130        try {
131            return new CoordJobXCommand(jobId).call();
132        }
133        catch (CommandException ex) {
134            throw new BaseEngineException(ex);
135        }
136    }
137
138    /*
139     * (non-Javadoc)
140     *
141     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
142     */
143    @Override
144    public CoordinatorJobBean getCoordJob(String jobId, String filter, int start, int length, boolean desc)
145            throws BaseEngineException {
146        List<String> filterList = parseStatusFilter(filter);
147        try {
148            return new CoordJobXCommand(jobId, filterList, start, length, desc)
149                    .call();
150        }
151        catch (CommandException ex) {
152            throw new BaseEngineException(ex);
153        }
154    }
155
156    /*
157     * (non-Javadoc)
158     *
159     * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
160     */
161    @Override
162    public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
163        return null;
164    }
165
166    /*
167     * (non-Javadoc)
168     *
169     * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
170     */
171    @Override
172    public void kill(String jobId) throws CoordinatorEngineException {
173        try {
174            new CoordKillXCommand(jobId).call();
175            LOG.info("User " + user + " killed the Coordinator job " + jobId);
176        }
177        catch (CommandException e) {
178            throw new CoordinatorEngineException(e);
179        }
180    }
181
182    /* (non-Javadoc)
183     * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
184     */
185    @Override
186    public void change(String jobId, String changeValue) throws CoordinatorEngineException {
187        try {
188            new CoordChangeXCommand(jobId, changeValue).call();
189            LOG.info("User " + user + " changed the Coordinator job " + jobId + " to " + changeValue);
190        }
191        catch (CommandException e) {
192            throw new CoordinatorEngineException(e);
193        }
194    }
195
196    @Override
197    @Deprecated
198    public void reRun(String jobId, Configuration conf) throws BaseEngineException {
199        throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun"));
200    }
201
202    /**
203     * Rerun coordinator actions for given rerunType
204     *
205     * @param jobId
206     * @param rerunType
207     * @param scope
208     * @param refresh
209     * @param noCleanup
210     * @throws BaseEngineException
211     */
212    public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup)
213            throws BaseEngineException {
214        try {
215            return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
216                    noCleanup).call();
217        }
218        catch (CommandException ex) {
219            throw new BaseEngineException(ex);
220        }
221    }
222
223    /*
224     * (non-Javadoc)
225     *
226     * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
227     */
228    @Override
229    public void resume(String jobId) throws CoordinatorEngineException {
230        try {
231            new CoordResumeXCommand(jobId).call();
232        }
233        catch (CommandException e) {
234            throw new CoordinatorEngineException(e);
235        }
236    }
237
238    @Override
239    @Deprecated
240    public void start(String jobId) throws BaseEngineException {
241        throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
242    }
243
244    /*
245     * (non-Javadoc)
246     *
247     * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String,
248     * java.io.Writer)
249     */
250    @Override
251    public void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException {
252        XLogStreamer.Filter filter = new XLogStreamer.Filter();
253        filter.setParameter(DagXLogInfoService.JOB, jobId);
254
255        CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
256        Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
257    }
258
259    /**
260     * Add list of actions to the filter based on conditions
261     *
262     * @param jobId Job Id
263     * @param logRetrievalScope Value for the retrieval type
264     * @param logRetrievalType Based on which filter criteria the log is retrieved
265     * @param writer writer to stream the log to
266     * @throws IOException
267     * @throws BaseEngineException
268     * @throws CommandException
269     */
270    public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer)
271            throws IOException, BaseEngineException, CommandException {
272        XLogStreamer.Filter filter = new XLogStreamer.Filter();
273        filter.setParameter(DagXLogInfoService.JOB, jobId);
274        if (logRetrievalScope != null && logRetrievalType != null) {
275            // if coordinator action logs are to be retrieved based on action id range
276            if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
277                // Use set implementation that maintains order or elements to achieve reproducibility:
278                Set<String> actionSet = new LinkedHashSet<String>();
279                String[] list = logRetrievalScope.split(",");
280                for (String s : list) {
281                    s = s.trim();
282                    if (s.contains("-")) {
283                        String[] range = s.split("-");
284                        if (range.length != 2) {
285                            throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
286                                    + "'");
287                        }
288                        int start;
289                        int end;
290                        try {
291                            start = Integer.parseInt(range[0].trim());
292                        } catch (NumberFormatException ne) {
293                            throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer",
294                                    ne);
295                        }
296                        try {
297                            end = Integer.parseInt(range[1].trim());
298                        } catch (NumberFormatException ne) {
299                            throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer",
300                                    ne);
301                        }
302                        if (start > end) {
303                            throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
304                        }
305                        for (int i = start; i <= end; i++) {
306                            actionSet.add(jobId + "@" + i);
307                        }
308                    }
309                    else {
310                        try {
311                            Integer.parseInt(s);
312                        }
313                        catch (NumberFormatException ne) {
314                            throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
315                                    + "'. Integer only.");
316                        }
317                        actionSet.add(jobId + "@" + s);
318                    }
319                }
320
321                Iterator<String> actionsIterator = actionSet.iterator();
322                StringBuilder orSeparatedActions = new StringBuilder("");
323                boolean orRequired = false;
324                while (actionsIterator.hasNext()) {
325                    if (orRequired) {
326                        orSeparatedActions.append("|");
327                    }
328                    orSeparatedActions.append(actionsIterator.next().toString());
329                    orRequired = true;
330                }
331                if (actionSet.size() > 1 && orRequired) {
332                    orSeparatedActions.insert(0, "(");
333                    orSeparatedActions.append(")");
334                }
335                filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
336            }
337            // if coordinator action logs are to be retrieved based on date range
338            // this block gets the corresponding list of coordinator actions to be used by the log filter
339            if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
340                List<String> coordActionIdList = null;
341                try {
342                    coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
343                }
344                catch (XException xe) {
345                    throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
346                }
347                StringBuilder orSeparatedActions = new StringBuilder("");
348                boolean orRequired = false;
349                for (String coordActionId : coordActionIdList) {
350                    if (orRequired) {
351                        orSeparatedActions.append("|");
352                    }
353                    orSeparatedActions.append(coordActionId);
354                    orRequired = true;
355                }
356                if (coordActionIdList.size() > 1 && orRequired) {
357                    orSeparatedActions.insert(0, "(");
358                    orSeparatedActions.append(")");
359                }
360                filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
361            }
362        }
363        CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
364        Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
365    }
366
367    /*
368     * (non-Javadoc)
369     *
370     * @see
371     * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
372     * , boolean)
373     */
374    @Override
375    public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
376        try {
377            CoordSubmitXCommand submit = new CoordSubmitXCommand(conf);
378            return submit.call();
379        }
380        catch (CommandException ex) {
381            throw new CoordinatorEngineException(ex);
382        }
383    }
384
385    /*
386     * (non-Javadoc)
387     *
388     * @see
389     * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
390     */
391    @Override
392    public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException {
393        try {
394            CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf);
395            return submit.call();
396        }
397        catch (CommandException ex) {
398            throw new CoordinatorEngineException(ex);
399        }
400    }
401
402    /*
403     * (non-Javadoc)
404     *
405     * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
406     */
407    @Override
408    public void suspend(String jobId) throws CoordinatorEngineException {
409        try {
410            new CoordSuspendXCommand(jobId).call();
411        }
412        catch (CommandException e) {
413            throw new CoordinatorEngineException(e);
414        }
415
416    }
417
418    /*
419     * (non-Javadoc)
420     *
421     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
422     */
423    @Override
424    public WorkflowJob getJob(String jobId) throws BaseEngineException {
425        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
426    }
427
428    /*
429     * (non-Javadoc)
430     *
431     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
432     */
433    @Override
434    public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
435        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
436    }
437
438    private static final Set<String> FILTER_NAMES = new HashSet<String>();
439
440    static {
441        FILTER_NAMES.add(OozieClient.FILTER_USER);
442        FILTER_NAMES.add(OozieClient.FILTER_NAME);
443        FILTER_NAMES.add(OozieClient.FILTER_GROUP);
444        FILTER_NAMES.add(OozieClient.FILTER_STATUS);
445        FILTER_NAMES.add(OozieClient.FILTER_ID);
446        FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
447        FILTER_NAMES.add(OozieClient.FILTER_UNIT);
448    }
449
450    /**
451     * @param filter
452     * @param start
453     * @param len
454     * @return CoordinatorJobInfo
455     * @throws CoordinatorEngineException
456     */
457    public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
458        Map<String, List<String>> filterList = parseFilter(filter);
459
460        try {
461            return new CoordJobsXCommand(filterList, start, len).call();
462        }
463        catch (CommandException ex) {
464            throw new CoordinatorEngineException(ex);
465        }
466    }
467
468
469    // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
470    private List<String> parseStatusFilter(String filter) throws CoordinatorEngineException {
471        List<String> filterList = new ArrayList<String>();
472        if (filter != null) {
473            //split name;value pairs
474            StringTokenizer st = new StringTokenizer(filter, ";");
475            while (st.hasMoreTokens()) {
476                String token = st.nextToken();
477                if (token.contains("=")) {
478                    String[] pair = token.split("=");
479                    if (pair.length != 2) {
480                        throw new CoordinatorEngineException(ErrorCode.E0421, token,
481                                "elements must be name=value pairs");
482                    }
483                    if (pair[0].equalsIgnoreCase("status")) {
484                        String statusValue = pair[1];
485                        try {
486                            CoordinatorAction.Status.valueOf(statusValue);
487                        } catch (IllegalArgumentException ex) {
488                            StringBuilder validStatusList = new StringBuilder();
489                            for (CoordinatorAction.Status status: CoordinatorAction.Status.values()){
490                                validStatusList.append(status.toString()+" ");
491                            }
492                            // Check for incorrect status value
493                            throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
494                                "invalid status value [{0}]." + " Valid status values are: [{1}]", statusValue, validStatusList));
495                        }
496                        filterList.add(statusValue);
497                    } else {
498                        // Check for incorrect filter option
499                        throw new CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
500                                "invalid filter [{0}]." + " The only valid filter is \"status\"", pair[0]));
501                    }
502                } else {
503                    throw new CoordinatorEngineException(ErrorCode.E0421, token,
504                             "elements must be name=value pairs");
505                }
506            }
507        }
508        return filterList;
509    }
510
511    /**
512     * @param filter
513     * @return Map<String, List<String>>
514     * @throws CoordinatorEngineException
515     */
516    @VisibleForTesting
517    Map<String, List<String>> parseFilter(String filter) throws CoordinatorEngineException {
518        Map<String, List<String>> map = new HashMap<String, List<String>>();
519        boolean isTimeUnitSpecified = false;
520        String timeUnit = "MINUTE";
521        boolean isFrequencySpecified = false;
522        String frequency = "";
523        if (filter != null) {
524            StringTokenizer st = new StringTokenizer(filter, ";");
525            while (st.hasMoreTokens()) {
526                String token = st.nextToken();
527                if (token.contains("=")) {
528                    String[] pair = token.split("=");
529                    if (pair.length != 2) {
530                        throw new CoordinatorEngineException(ErrorCode.E0420, filter,
531                                "elements must be name=value pairs");
532                    }
533                    if (!FILTER_NAMES.contains(pair[0].toLowerCase())) {
534                        throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
535                                pair[0]));
536                    }
537                    if (pair[0].equalsIgnoreCase("frequency")) {
538                        isFrequencySpecified = true;
539                        try {
540                            frequency = (int) Float.parseFloat(pair[1]) + "";
541                            continue;
542                        }
543                        catch (NumberFormatException NANException) {
544                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
545                                    "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
546                        }
547                    }
548                    if (pair[0].equalsIgnoreCase("unit")) {
549                        isTimeUnitSpecified = true;
550                        timeUnit = pair[1];
551                        if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
552                                && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
553                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
554                                    "invalid value [{0}] for time unit. "
555                                            + "Valid value is one of months, days, hours or minutes", pair[1]));
556                        }
557                        continue;
558                    }
559                    if (pair[0].equals("status")) {
560                        try {
561                            CoordinatorJob.Status.valueOf(pair[1]);
562                        }
563                        catch (IllegalArgumentException ex) {
564                            throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
565                                    "invalid status [{0}]", pair[1]));
566                        }
567                    }
568                    List<String> list = map.get(pair[0]);
569                    if (list == null) {
570                        list = new ArrayList<String>();
571                        map.put(pair[0], list);
572                    }
573                    list.add(pair[1]);
574                } else {
575                    throw new CoordinatorEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
576                }
577            }
578            // Unit is specified and frequency is not specified
579            if (!isFrequencySpecified && isTimeUnitSpecified) {
580                throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
581                        + "frequency is specified. Either specify frequency also or else remove the time unit");
582            } else if (isFrequencySpecified) {
583                // Frequency value is specified
584                if (isTimeUnitSpecified) {
585                    if (timeUnit.equalsIgnoreCase("months")) {
586                        timeUnit = "MONTH";
587                    } else if (timeUnit.equalsIgnoreCase("days")) {
588                        timeUnit = "DAY";
589                    } else if (timeUnit.equalsIgnoreCase("hours")) {
590                        // When job details are persisted to database, frequency in hours are converted to minutes.
591                        // This conversion is to conform with that.
592                        frequency = Integer.parseInt(frequency) * 60 + "";
593                        timeUnit = "MINUTE";
594                    } else if (timeUnit.equalsIgnoreCase("minutes")) {
595                        timeUnit = "MINUTE";
596                    }
597                }
598                // Adding the frequency and time unit filters to the filter map
599                List<String> list = new ArrayList<String>();
600                list.add(timeUnit);
601                map.put("unit", list);
602                list = new ArrayList<String>();
603                list.add(frequency);
604                map.put("frequency", list);
605            }
606        }
607        return map;
608    }
609}