001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie;
019
020import java.io.IOException;
021import java.io.Writer;
022import java.text.ParseException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.StringTokenizer;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.oozie.client.CoordinatorAction;
034import org.apache.oozie.client.CoordinatorJob;
035import org.apache.oozie.client.Job;
036import org.apache.oozie.client.OozieClient;
037import org.apache.oozie.client.WorkflowJob;
038import org.apache.oozie.client.rest.BulkResponseImpl;
039import org.apache.oozie.command.BulkJobsXCommand;
040import org.apache.oozie.command.CommandException;
041import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
042import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
043import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
044import org.apache.oozie.command.bundle.BundleJobXCommand;
045import org.apache.oozie.command.bundle.BundleJobsXCommand;
046import org.apache.oozie.command.bundle.BundleKillXCommand;
047import org.apache.oozie.command.bundle.BundleRerunXCommand;
048import org.apache.oozie.command.bundle.BundleStartXCommand;
049import org.apache.oozie.command.bundle.BundleSubmitXCommand;
050import org.apache.oozie.service.DagXLogInfoService;
051import org.apache.oozie.service.Services;
052import org.apache.oozie.service.XLogService;
053import org.apache.oozie.util.DateUtils;
054import org.apache.oozie.util.ParamChecker;
055import org.apache.oozie.util.XLog;
056import org.apache.oozie.util.XLogStreamer;
057
058import com.google.common.annotations.VisibleForTesting;
059
060public class BundleEngine extends BaseEngine {
061    /**
062     * Create a system Bundle engine, with no user and no group.
063     */
064    public BundleEngine() {
065    }
066
067    /**
068     * Create a Bundle engine to perform operations on behave of a user.
069     *
070     * @param user user name.
071     */
072    public BundleEngine(String user) {
073        this.user = ParamChecker.notEmpty(user, "user");
074    }
075
076    /* (non-Javadoc)
077     * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
078     */
079    @Override
080    public void change(String jobId, String changeValue) throws BundleEngineException {
081        try {
082            BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
083            change.call();
084        }
085        catch (CommandException ex) {
086            throw new BundleEngineException(ex);
087        }
088    }
089
090    /* (non-Javadoc)
091     * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
092     */
093    @Override
094    public String dryRunSubmit(Configuration conf) throws BundleEngineException {
095        BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf);
096        try {
097            String jobId = submit.call();
098            return jobId;
099        }
100        catch (CommandException ex) {
101            throw new BundleEngineException(ex);
102        }
103    }
104
105    /* (non-Javadoc)
106     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
107     */
108    @Override
109    public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
110        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
111    }
112
113    public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
114        try {
115            return new BundleJobXCommand(jobId).call();
116        }
117        catch (CommandException ex) {
118            throw new BundleEngineException(ex);
119        }
120    }
121
122    /* (non-Javadoc)
123     * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
124     */
125    @Override
126    public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
127            throws BundleEngineException {
128        throw new BundleEngineException(new XException(ErrorCode.E0301,
129                "cannot get a coordinator job from BundleEngine"));
130    }
131
132    /* (non-Javadoc)
133     * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
134     */
135    @Override
136    public String getDefinition(String jobId) throws BundleEngineException {
137        BundleJobBean job;
138        try {
139            job = new BundleJobXCommand(jobId).call();
140        }
141        catch (CommandException ex) {
142            throw new BundleEngineException(ex);
143        }
144        return job.getOrigJobXml();
145    }
146
147    /* (non-Javadoc)
148     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
149     */
150    @Override
151    public WorkflowJob getJob(String jobId) throws BundleEngineException {
152        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
153    }
154
155    /* (non-Javadoc)
156     * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
157     */
158    @Override
159    public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
160        throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
161    }
162
163    /* (non-Javadoc)
164     * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
165     */
166    @Override
167    public String getJobIdForExternalId(String externalId) throws BundleEngineException {
168        return null;
169    }
170
171    /* (non-Javadoc)
172     * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
173     */
174    @Override
175    public void kill(String jobId) throws BundleEngineException {
176        try {
177            new BundleKillXCommand(jobId).call();
178        }
179        catch (CommandException e) {
180            throw new BundleEngineException(e);
181        }
182    }
183
184    /* (non-Javadoc)
185     * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
186     */
187    @Override
188    @Deprecated
189    public void reRun(String jobId, Configuration conf) throws BundleEngineException {
190        throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun"));
191    }
192
193    /**
194     * Rerun Bundle actions for given rerunType
195     *
196     * @param jobId bundle job id
197     * @param coordScope the rerun scope for coordinator job names separated by ","
198     * @param dateScope the rerun scope for coordinator nominal times separated by ","
199     * @param refresh true if user wants to refresh input/outpur dataset urls
200     * @param noCleanup false if user wants to cleanup output events for given rerun actions
201     * @throws BaseEngineException thrown if failed to rerun
202     */
203    public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
204            throws BaseEngineException {
205        try {
206            new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
207        }
208        catch (CommandException ex) {
209            throw new BaseEngineException(ex);
210        }
211    }
212
213    /* (non-Javadoc)
214     * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
215     */
216    @Override
217    public void resume(String jobId) throws BundleEngineException {
218        BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
219        try {
220            resume.call();
221        }
222        catch (CommandException ex) {
223            throw new BundleEngineException(ex);
224        }
225    }
226
227    /* (non-Javadoc)
228     * @see org.apache.oozie.BaseEngine#start(java.lang.String)
229     */
230    @Override
231    public void start(String jobId) throws BundleEngineException {
232        try {
233            new BundleStartXCommand(jobId).call();
234        }
235        catch (CommandException e) {
236            throw new BundleEngineException(e);
237        }
238    }
239
240    /* (non-Javadoc)
241     * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer)
242     */
243    @Override
244    public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException {
245        XLogStreamer.Filter filter = new XLogStreamer.Filter();
246        filter.setParameter(DagXLogInfoService.JOB, jobId);
247
248        BundleJobBean job;
249        try {
250            job = new BundleJobXCommand(jobId).call();
251        }
252        catch (CommandException ex) {
253            throw new BundleEngineException(ex);
254        }
255
256        Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
257    }
258
259    /* (non-Javadoc)
260     * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
261     */
262    @Override
263    public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
264        try {
265            String jobId = new BundleSubmitXCommand(conf).call();
266
267            if (startJob) {
268                start(jobId);
269            }
270            return jobId;
271        }
272        catch (CommandException ex) {
273            throw new BundleEngineException(ex);
274        }
275    }
276
277    /* (non-Javadoc)
278     * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
279     */
280    @Override
281    public void suspend(String jobId) throws BundleEngineException {
282        BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
283        try {
284            suspend.call();
285        }
286        catch (CommandException ex) {
287            throw new BundleEngineException(ex);
288        }
289    }
290
291    private static final Set<String> FILTER_NAMES = new HashSet<String>();
292
293    static {
294        FILTER_NAMES.add(OozieClient.FILTER_USER);
295        FILTER_NAMES.add(OozieClient.FILTER_NAME);
296        FILTER_NAMES.add(OozieClient.FILTER_GROUP);
297        FILTER_NAMES.add(OozieClient.FILTER_STATUS);
298        FILTER_NAMES.add(OozieClient.FILTER_ID);
299    }
300
301    /**
302     * Get bundle jobs
303     *
304     * @param filter the filter string
305     * @param start start location for paging
306     * @param len total length to get
307     * @return bundle job info
308     * @throws BundleEngineException thrown if failed to get bundle job info
309     */
310    public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException {
311        Map<String, List<String>> filterList = parseFilter(filter);
312
313        try {
314            return new BundleJobsXCommand(filterList, start, len).call();
315        }
316        catch (CommandException ex) {
317            throw new BundleEngineException(ex);
318        }
319    }
320
321    /**
322     * Parse filter string to a map with key = filter name and values = filter values
323     *
324     * @param filter the filter string
325     * @return filter key and value map
326     * @throws CoordinatorEngineException thrown if failed to parse filter string
327     */
328    @VisibleForTesting
329    Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
330        Map<String, List<String>> map = new HashMap<String, List<String>>();
331        if (filter != null) {
332            StringTokenizer st = new StringTokenizer(filter, ";");
333            while (st.hasMoreTokens()) {
334                String token = st.nextToken();
335                if (token.contains("=")) {
336                    String[] pair = token.split("=");
337                    if (pair.length != 2) {
338                        throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
339                    }
340                    if (!FILTER_NAMES.contains(pair[0])) {
341                        throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
342                                pair[0]));
343                    }
344                    if (pair[0].equals("status")) {
345                        try {
346                            Job.Status.valueOf(pair[1]);
347                        }
348                        catch (IllegalArgumentException ex) {
349                            throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format(
350                                    "invalid status [{0}]", pair[1]));
351                        }
352                    }
353                    List<String> list = map.get(pair[0]);
354                    if (list == null) {
355                        list = new ArrayList<String>();
356                        map.put(pair[0], list);
357                    }
358                    list.add(pair[1]);
359                }
360                else {
361                    throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
362                }
363            }
364        }
365        return map;
366    }
367
368    /**
369     * Get bulk job response
370     *
371     * @param filter the filter string
372     * @param start start location for paging
373     * @param len total length to get
374     * @return bulk job info
375     * @throws BundleEngineException thrown if failed to get bulk job info
376     */
377    public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
378        Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
379        try {
380            return new BulkJobsXCommand(bulkRequestMap, start, len).call();
381        }
382        catch (CommandException ex) {
383            throw new BundleEngineException(ex);
384        }
385    }
386
387    /**
388     * Parse filter string to a map with key = filter name and values = filter values
389     * Allowed keys are defined as constants on top
390     *
391     * @param filter the filter string
392     * @return filter key-value pair map
393     * @throws BundleEngineException thrown if failed to parse filter string
394     */
395    public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
396
397        Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
398        // Functionality can be extended to different job levels - TODO extend filter parser and query
399        // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
400        if (bulkFilter != null) {
401            StringTokenizer st = new StringTokenizer(bulkParams, ";");
402            while (st.hasMoreTokens()) {
403                String token = st.nextToken();
404                if (token.contains("=")) {
405                    String[] pair = token.split("=");
406                    if (pair.length != 2) {
407                        throw new BundleEngineException(ErrorCode.E0420, token,
408                                "elements must be name=value pairs");
409                    }
410                    pair[0] = pair[0].toLowerCase();
411                    String[] values = pair[1].split(",");
412                    if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
413                        throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
414                                pair[0]));
415                    }
416                    // special check and processing for time related params
417                    if (pair[0].contains("time")) {
418                        try {
419                            DateUtils.parseDateUTC(pair[1]);
420                        }
421                        catch (ParseException e) {
422                            throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
423                                    "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
424                                    DateUtils.ISO8601_UTC_MASK));
425                        }
426                    }
427                    // special check for action status param
428                    // TODO: when extended for levels other than coord action, check against corresponding level's Status values
429                    if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
430                        for(String value : values) {
431                            try {
432                                CoordinatorAction.Status.valueOf(value);
433                            }
434                            catch (IllegalArgumentException ex) {
435                                throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
436                                        "invalid action status [{0}]", value));
437                            }
438                        }
439                    }
440                    // eventually adding into map for all cases e.g. names, times, status
441                    List<String> list = bulkFilter.get(pair[0]);
442                    if (list == null) {
443                        list = new ArrayList<String>();
444                        bulkFilter.put(pair[0], list);
445                    }
446                    for(String value : values) {
447                        value = value.trim();
448                        if(value.isEmpty()) {
449                            throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
450                        }
451                        list.add(value);
452                    }
453                } else {
454                    throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs");
455                }
456            }
457            if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) {
458                throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
459            }
460        }
461        return bulkFilter;
462    }
463}