001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.BundleActionBean;
028import org.apache.oozie.BundleJobBean;
029import org.apache.oozie.CoordinatorActionBean;
030import org.apache.oozie.CoordinatorJobBean;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.WorkflowActionBean;
033import org.apache.oozie.client.Job;
034import org.apache.oozie.client.OozieClient;
035import org.apache.oozie.command.CommandException;
036import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
037import org.apache.oozie.command.coord.CoordActionReadyXCommand;
038import org.apache.oozie.command.coord.CoordActionStartXCommand;
039import org.apache.oozie.command.coord.CoordKillXCommand;
040import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
041import org.apache.oozie.command.coord.CoordResumeXCommand;
042import org.apache.oozie.command.coord.CoordSubmitXCommand;
043import org.apache.oozie.command.coord.CoordSuspendXCommand;
044import org.apache.oozie.command.wf.ActionEndXCommand;
045import org.apache.oozie.command.wf.ActionStartXCommand;
046import org.apache.oozie.command.wf.KillXCommand;
047import org.apache.oozie.command.wf.ResumeXCommand;
048import org.apache.oozie.command.wf.SignalXCommand;
049import org.apache.oozie.command.wf.SuspendXCommand;
050import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
051import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
052import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
053import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
054import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
055import org.apache.oozie.executor.jpa.JPAExecutorException;
056import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor;
057import org.apache.oozie.util.JobUtils;
058import org.apache.oozie.util.XCallable;
059import org.apache.oozie.util.XConfiguration;
060import org.apache.oozie.util.XLog;
061import org.apache.oozie.util.XmlUtils;
062import org.jdom.Attribute;
063import org.jdom.Element;
064
065/**
066 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
067 * queues them for execution.
068 */
069public class RecoveryService implements Service {
070
071    public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
072    public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions.";
073    public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord.";
074    public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle.";
075    /**
076     * Time interval, in seconds, at which the recovery service will be scheduled to run.
077     */
078    public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval";
079    /**
080     * The number of callables to be queued in a batch.
081     */
082    public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
083
084    /**
085     * Delay for the push missing dependencies in milliseconds.
086     */
087    public static final String CONF_PUSH_DEPENDENCY_INTERVAL = CONF_PREFIX + "push.dependency.interval";
088
089    /**
090     * Age of actions to queue, in seconds.
091     */
092    public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
093    /**
094     * Age of coordinator jobs to recover, in seconds.
095     */
096    public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
097
098    /**
099     * Age of Bundle jobs to recover, in seconds.
100     */
101    public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
102
103    private static final String INSTRUMENTATION_GROUP = "recovery";
104    private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
105    private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
106    private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
107
108
109    /**
110     * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
111     * queuing of commands.
112     */
113    static class RecoveryRunnable implements Runnable {
114        private final long olderThan;
115        private final long coordOlderThan;
116        private final long bundleOlderThan;
117        private long delay = 0;
118        private List<XCallable<?>> callables;
119        private List<XCallable<?>> delayedCallables;
120        private StringBuilder msg = null;
121        private JPAService jpaService = null;
122
123        public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
124            this.olderThan = olderThan;
125            this.coordOlderThan = coordOlderThan;
126            this.bundleOlderThan = bundleOlderThan;
127        }
128
129        public void run() {
130            XLog.Info.get().clear();
131            XLog log = XLog.getLog(getClass());
132            msg = new StringBuilder();
133            jpaService = Services.get().get(JPAService.class);
134            runWFRecovery();
135            runCoordActionRecovery();
136            runCoordActionRecoveryForReady();
137            runBundleRecovery();
138            log.debug("QUEUING [{0}] for potential recovery", msg.toString());
139            boolean ret = false;
140            if (null != callables) {
141                ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
142                if (ret == false) {
143                    log.warn("Unable to queue the callables commands for RecoveryService. "
144                            + "Most possibly command queue is full. Queue size is :"
145                            + Services.get().get(CallableQueueService.class).queueSize());
146                }
147                callables = null;
148            }
149            if (null != delayedCallables) {
150                ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
151                if (ret == false) {
152                    log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
153                            + "Most possibly Callable queue is full. Queue size is :"
154                            + Services.get().get(CallableQueueService.class).queueSize());
155                }
156                delayedCallables = null;
157                this.delay = 0;
158            }
159        }
160
161        private void runBundleRecovery(){
162            XLog.Info.get().clear();
163            XLog log = XLog.getLog(getClass());
164            List<BundleActionBean> bactions = null;
165            try {
166                bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan));
167            }
168            catch (JPAExecutorException ex) {
169                log.warn("Error reading bundle actions from database", ex);
170                return;
171            }
172            msg.append(", BUNDLE_ACTIONS : " + bactions.size());
173            for (BundleActionBean baction : bactions) {
174                try {
175                    Services.get().get(InstrumentationService.class).get()
176                            .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
177                    if (baction.getCoordId() == null) {
178                        log.error("CoordId is null for Bundle action " + baction.getBundleActionId());
179                        continue;
180                    }
181                    if (baction.getStatus() == Job.Status.PREP) {
182                        BundleJobBean bundleJob = null;
183
184                        if (jpaService != null) {
185                            bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId()));
186                        }
187                        if (bundleJob != null) {
188                            Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
189                            List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
190                            for (Element coordElem : coordElems) {
191                                Attribute name = coordElem.getAttribute("name");
192                                if (name.getValue().equals(baction.getCoordName())) {
193                                    Configuration coordConf = mergeConfig(coordElem, bundleJob);
194                                    coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
195                                    queueCallable(new CoordSubmitXCommand(coordConf,
196                                            bundleJob.getId(), name.getValue()));
197                                }
198                            }
199                        }
200
201                    }
202                    else if (baction.getStatus() == Job.Status.KILLED) {
203                        queueCallable(new CoordKillXCommand(baction.getCoordId()));
204                    }
205                    else if (baction.getStatus() == Job.Status.SUSPENDED
206                            || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
207                        queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
208                    }
209                    else if (baction.getStatus() == Job.Status.RUNNING
210                            || baction.getStatus() == Job.Status.RUNNINGWITHERROR) {
211                        queueCallable(new CoordResumeXCommand(baction.getCoordId()));
212                    }
213                }
214                catch (Exception ex) {
215                    log.error("Exception, {0}", ex.getMessage(), ex);
216                }
217            }
218
219        }
220
221        /**
222         * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
223         */
224        private void runCoordActionRecovery() {
225            XLog.Info.get().clear();
226            XLog log = XLog.getLog(getClass());
227            long pushMissingDepInterval = Services.get().getConf().getLong(CONF_PUSH_DEPENDENCY_INTERVAL, 200);
228            long pushMissingDepDelay = pushMissingDepInterval;
229            List<CoordinatorActionBean> cactions = null;
230            try {
231                cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
232            }
233            catch (JPAExecutorException ex) {
234                log.warn("Error reading coord actions from database", ex);
235                return;
236            }
237            msg.append(", COORD_ACTIONS : " + cactions.size());
238            for (CoordinatorActionBean caction : cactions) {
239                try {
240                    Services.get().get(InstrumentationService.class).get()
241                            .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
242                    if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
243                        queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
244                        log.info("Recover a WAITING coord action and resubmit CoordActionInputCheckXCommand :"
245                                + caction.getId());
246                        if (caction.getPushMissingDependencies() != null
247                                && caction.getPushMissingDependencies().length() != 0) {
248                            queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true),
249                                    pushMissingDepDelay);
250                            pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval;
251                            log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :"
252                                    + caction.getId());
253                        }
254                    }
255                    else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
256                        CoordinatorJobBean coordJob = jpaService
257                                .execute(new CoordJobGetJPAExecutor(caction.getJobId()));
258                        queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
259                                coordJob.getAppName(), caction.getJobId()));
260
261                        log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :"
262                                + caction.getId());
263                    }
264                    else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
265                        if (caction.getExternalId() != null) {
266                            queueCallable(new SuspendXCommand(caction.getExternalId()));
267                            log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :"
268                                    + caction.getId());
269                        }
270                    }
271                    else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
272                        if (caction.getExternalId() != null) {
273                            queueCallable(new KillXCommand(caction.getExternalId()));
274                            log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
275                        }
276                    }
277                    else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
278                        if (caction.getExternalId() != null) {
279                            queueCallable(new ResumeXCommand(caction.getExternalId()));
280                            log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
281                        }
282                    }
283                }
284                catch (Exception ex) {
285                    log.error("Exception, {0}", ex.getMessage(), ex);
286                }
287            }
288
289
290        }
291
292        /**
293         * Recover coordinator actions that are staying in READY too long
294         */
295        private void runCoordActionRecoveryForReady() {
296            XLog.Info.get().clear();
297            XLog log = XLog.getLog(getClass());
298
299            try {
300                List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
301                msg.append(", COORD_READY_JOBS : " + jobids.size());
302                for (String jobid : jobids) {
303                        queueCallable(new CoordActionReadyXCommand(jobid));
304
305                    log.info("Recover READY coord actions for jobid :" + jobid);
306                }
307            }
308            catch (Exception ex) {
309                log.error("Exception, {0}", ex.getMessage(), ex);
310            }
311        }
312
313        /**
314         * Recover wf actions
315         */
316        private void runWFRecovery() {
317            XLog.Info.get().clear();
318            XLog log = XLog.getLog(getClass());
319            // queue command for action recovery
320            List<WorkflowActionBean> actions = null;
321            try {
322                actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan));
323            }
324            catch (JPAExecutorException ex) {
325                log.warn("Exception while reading pending actions from storage", ex);
326                return;
327            }
328            // log.debug("QUEUING[{0}] pending wf actions for potential recovery",
329            // actions.size());
330            msg.append(" WF_ACTIONS " + actions.size());
331
332            for (WorkflowActionBean action : actions) {
333                try {
334                    Services.get().get(InstrumentationService.class).get()
335                            .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1);
336                    if (action.getStatus() == WorkflowActionBean.Status.PREP
337                            || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
338                        queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
339                    }
340                    else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
341                        Date nextRunTime = action.getPendingAge();
342                        queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
343                                - System.currentTimeMillis());
344                    }
345                    else if (action.getStatus() == WorkflowActionBean.Status.DONE
346                            || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
347                        queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
348                    }
349                    else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
350                        Date nextRunTime = action.getPendingAge();
351                        queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
352                                - System.currentTimeMillis());
353
354                    }
355                    else if (action.getStatus() == WorkflowActionBean.Status.OK
356                            || action.getStatus() == WorkflowActionBean.Status.ERROR) {
357                        queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
358                    }
359                    else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
360                        queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
361                    }
362                }
363                catch (Exception ex) {
364                    log.error("Exception, {0}", ex.getMessage(), ex);
365                }
366            }
367
368        }
369
370        /**
371         * Adds callables to a list. If the number of callables in the list reaches {@link
372         * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
373         *
374         * @param callable the callable to queue.
375         */
376        private void queueCallable(XCallable<?> callable) {
377            if (callables == null) {
378                callables = new ArrayList<XCallable<?>>();
379            }
380            callables.add(callable);
381            if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
382                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
383                if (ret == false) {
384                    XLog.getLog(getClass()).warn(
385                            "Unable to queue the callables commands for RecoveryService. "
386                                    + "Most possibly command queue is full. Queue size is :"
387                                    + Services.get().get(CallableQueueService.class).queueSize());
388                }
389                callables = new ArrayList<XCallable<?>>();
390            }
391        }
392
393        /**
394         * Adds callables to a list. If the number of callables in the list reaches {@link
395         * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
396         * of the callables in the list. The callables list and the delay is reset.
397         *
398         * @param callable the callable to queue.
399         * @param delay the delay for the callable.
400         */
401        private void queueCallable(XCallable<?> callable, long delay) {
402            if (delayedCallables == null) {
403                delayedCallables = new ArrayList<XCallable<?>>();
404            }
405            this.delay = Math.max(this.delay, delay);
406            delayedCallables.add(callable);
407            if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
408                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
409                if (ret == false) {
410                    XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
411                            + "Most possibly Callable queue is full. Queue size is :"
412                            + Services.get().get(CallableQueueService.class).queueSize());
413                }
414                delayedCallables = new ArrayList<XCallable<?>>();
415                this.delay = 0;
416            }
417        }
418    }
419
420    /**
421     * Initializes the RecoveryService.
422     *
423     * @param services services instance.
424     */
425    @Override
426    public void init(Services services) {
427        Configuration conf = services.getConf();
428        Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
429                CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
430        services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf),
431                                                      SchedulerService.Unit.SEC);
432    }
433
434    public int getRecoveryServiceInterval(Configuration conf){
435        return conf.getInt(CONF_SERVICE_INTERVAL, 60);
436    }
437
438    /**
439     * Destroy the Recovery Service.
440     */
441    @Override
442    public void destroy() {
443    }
444
445    /**
446     * Return the public interface for the Recovery Service.
447     *
448     * @return {@link RecoveryService}.
449     */
450    @Override
451    public Class<? extends Service> getInterface() {
452        return RecoveryService.class;
453    }
454
455    /**
456     * Merge Bundle job config and the configuration from the coord job to pass
457     * to Coord Engine
458     *
459     * @param coordElem the coordinator configuration
460     * @return Configuration merged configuration
461     * @throws CommandException thrown if failed to merge configuration
462     */
463    private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
464        XLog.Info.get().clear();
465        XLog log = XLog.getLog("RecoveryService");
466
467        String jobConf = bundleJob.getConf();
468        // Step 1: runConf = jobConf
469        Configuration runConf = null;
470        try {
471            runConf = new XConfiguration(new StringReader(jobConf));
472        }
473        catch (IOException e1) {
474            log.warn("Configuration parse error in:" + jobConf);
475            throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
476        }
477        // Step 2: Merge local properties into runConf
478        // extract 'property' tags under 'configuration' block in the coordElem
479        // convert Element to XConfiguration
480        Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
481
482        if (localConfigElement != null) {
483            String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
484            Configuration localConf;
485            try {
486                localConf = new XConfiguration(new StringReader(strConfig));
487            }
488            catch (IOException e1) {
489                log.warn("Configuration parse error in:" + strConfig);
490                throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
491            }
492
493            // copy configuration properties in the coordElem to the runConf
494            XConfiguration.copy(localConf, runConf);
495        }
496
497        // Step 3: Extract value of 'app-path' in coordElem, save it as a
498        // new property called 'oozie.coord.application.path', and normalize.
499        String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
500        runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
501        // Normalize coordinator appPath here;
502        try {
503            JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
504        }
505        catch (IOException e) {
506            throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
507        }
508        return runConf;
509    }
510}