001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.Date;
021import java.util.List;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.oozie.BundleJobBean;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.command.bundle.BundlePauseXCommand;
027import org.apache.oozie.command.bundle.BundleStartXCommand;
028import org.apache.oozie.command.bundle.BundleUnpauseXCommand;
029import org.apache.oozie.command.coord.CoordPauseXCommand;
030import org.apache.oozie.command.coord.CoordUnpauseXCommand;
031import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor;
032import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor;
033import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor;
034import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor;
035import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor;
036import org.apache.oozie.service.SchedulerService;
037import org.apache.oozie.service.Service;
038import org.apache.oozie.service.Services;
039import org.apache.oozie.util.MemoryLocks;
040import org.apache.oozie.util.XLog;
041
042/**
043 * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles
044 * to see if they should be paused, un-paused or started.
045 */
046public class PauseTransitService implements Service {
047    public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService.";
048    public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval";
049    private final static XLog LOG = XLog.getLog(PauseTransitService.class);
050
051    /**
052     * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all
053     * bundles to see if they should be paused, un-paused or started.
054     */
055    static class PauseTransitRunnable implements Runnable {
056        private JPAService jpaService = null;
057        private MemoryLocks.LockToken lock;
058
059        public PauseTransitRunnable() {
060            jpaService = Services.get().get(JPAService.class);
061            if (jpaService == null) {
062                LOG.error("Missing JPAService");
063            }
064        }
065
066        public void run() {
067            try {
068                // first check if there is some other running instance from the same service;
069                lock = Services.get().get(MemoryLocksService.class).getWriteLock(
070                        PauseTransitService.class.getName(), lockTimeout);
071                if (lock == null) {
072                    LOG.info("This PauseTransitService instance will"
073                            + "not run since there is already an instance running");
074                }
075                else {
076                    LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName());
077
078                    updateBundle();
079                    updateCoord();
080                }
081            }
082            catch (Exception ex) {
083                LOG.warn("Exception happened when pausing/unpausing/starting bundle/coord jobs", ex);
084            }
085            finally {
086                // release lock;
087                if (lock != null) {
088                    lock.release();
089                    LOG.info("Released lock for [{0}]", PauseTransitService.class.getName());
090                }
091            }
092        }
093
094        private void updateBundle() {
095            Date d = new Date(); // records the start time of this service run;
096            List<BundleJobBean> jobList = null;
097            // pause bundles as needed;
098            try {
099                jobList = jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1));
100                if (jobList != null) {
101                    for (BundleJobBean bundleJob : jobList) {
102                        if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) {
103                            new BundlePauseXCommand(bundleJob).call();
104                            LOG.debug("Calling BundlePauseXCommand for bundle job = " + bundleJob.getId());
105                        }
106                    }
107                }
108            }
109            catch (Exception ex) {
110                LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
111            }
112            // unpause bundles as needed;
113            try {
114                jobList = jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1));
115                if (jobList != null) {
116                    for (BundleJobBean bundleJob : jobList) {
117                        if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) {
118                            new BundleUnpauseXCommand(bundleJob).call();
119                            LOG.debug("Calling BundleUnpauseXCommand for bundle job = " + bundleJob.getId());
120                        }
121                    }
122                }
123            }
124            catch (Exception ex) {
125                LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
126            }
127            // start bundles as needed;
128            try {
129                jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d));
130                if (jobList != null) {
131                    for (BundleJobBean bundleJob : jobList) {
132                        bundleJob.setKickoffTime(d);
133                        new BundleStartXCommand(bundleJob.getId()).call();
134                        LOG.debug("Calling BundleStartXCommand for bundle job = " + bundleJob.getId());
135                    }
136                }
137            }
138            catch (Exception ex) {
139                LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex);
140            }
141        }
142
143        private void updateCoord() {
144            Date d = new Date(); // records the start time of this service run;
145            List<CoordinatorJobBean> jobList = null;
146            Configuration conf = Services.get().getConf();
147            boolean backwardSupportForCoordStatus = conf.getBoolean(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, false);
148
149            // pause coordinators as needed;
150            try {
151                jobList = jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1));
152                if (jobList != null) {
153                    for (CoordinatorJobBean coordJob : jobList) {
154                        // if namespace 0.1 is used and backward support is true, then ignore this coord job
155                        if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
156                                && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
157                            continue;
158                        }
159                        if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) {
160                            new CoordPauseXCommand(coordJob).call();
161                            LOG.debug("Calling CoordPauseXCommand for coordinator job = " + coordJob.getId());
162                        }
163                    }
164                }
165            }
166            catch (Exception ex) {
167                LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex);
168            }
169            // unpause coordinators as needed;
170            try {
171                jobList = jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1));
172                if (jobList != null) {
173                    for (CoordinatorJobBean coordJob : jobList) {
174                        // if namespace 0.1 is used and backward support is true, then ignore this coord job
175                        if (backwardSupportForCoordStatus == true && coordJob.getAppNamespace() != null
176                                && coordJob.getAppNamespace().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
177                            continue;
178                        }
179                        if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) {
180                            new CoordUnpauseXCommand(coordJob).call();
181                            LOG.debug("Calling CoordUnpauseXCommand for coordinator job = " + coordJob.getId());
182                        }
183                    }
184                }
185            }
186            catch (Exception ex) {
187                LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex);
188            }
189        }
190    }
191
192    /**
193     * Initializes the {@link PauseTransitService}.
194     *
195     * @param services services instance.
196     */
197    @Override
198    public void init(Services services) {
199        Configuration conf = services.getConf();
200        Runnable bundlePauseStartRunnable = new PauseTransitRunnable();
201        services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10,
202                conf.getInt(CONF_BUNDLE_PAUSE_START_INTERVAL, 60), SchedulerService.Unit.SEC);
203    }
204
205    /**
206     * Destroy the StateTransit Jobs Service.
207     */
208    @Override
209    public void destroy() {
210    }
211
212    /**
213     * Return the public interface for the purge jobs service.
214     *
215     * @return {@link PauseTransitService}.
216     */
217    @Override
218    public Class<? extends Service> getInterface() {
219        return PauseTransitService.class;
220    }
221}