001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.oozie.CoordinatorJobBean;
026import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
027import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
028import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
029import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
030import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor;
031import org.apache.oozie.executor.jpa.JPAExecutorException;
032import org.apache.oozie.util.XCallable;
033import org.apache.oozie.util.XLog;
034
035/**
036 * The coordinator Materialization Lookup trigger service schedule lookup trigger command for every interval (default is
037 * 5 minutes ). This interval could be configured through oozie configuration defined is either oozie-default.xml or
038 * oozie-site.xml using the property name oozie.service.CoordMaterializeTriggerService.lookup.interval
039 */
040public class CoordMaterializeTriggerService implements Service {
041    public static final String CONF_PREFIX = Service.CONF_PREFIX + "CoordMaterializeTriggerService.";
042    /**
043     * Time interval, in seconds, at which the Job materialization service will be scheduled to run.
044     */
045    public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval";
046    /**
047     * This configuration defined the duration for which job should be materialized in future
048     */
049    public static final String CONF_MATERIALIZATION_WINDOW = CONF_PREFIX + "materialization.window";
050    /**
051     * The number of callables to be queued in a batch.
052     */
053    public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
054    /**
055     * The number of coordinator jobs to be picked for materialization at a given time.
056     */
057    public static final String CONF_MATERIALIZATION_SYSTEM_LIMIT = CONF_PREFIX + "materialization.system.limit";
058
059    private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
060    private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
061    private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
062    private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
063    private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
064
065    /**
066     * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand.
067     */
068    static class CoordMaterializeTriggerRunnable implements Runnable {
069        private int materializationWindow;
070        private int lookupInterval;
071        private long delay = 0;
072        private List<XCallable<Void>> callables;
073        private List<XCallable<Void>> delayedCallables;
074
075        public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) {
076            this.materializationWindow = materializationWindow;
077            this.lookupInterval = lookupInterval;
078        }
079
080        @Override
081        public void run() {
082            runCoordJobMatLookup();
083
084            if (null != callables) {
085                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
086                if (ret == false) {
087                    XLog.getLog(getClass()).warn(
088                            "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
089                                    + "Most possibly command queue is full. Queue size is :"
090                                    + Services.get().get(CallableQueueService.class).queueSize());
091                }
092                callables = null;
093            }
094            if (null != delayedCallables) {
095                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
096                if (ret == false) {
097                    XLog.getLog(getClass()).warn(
098                            "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
099                                    + "Most possibly Callable queue is full. Queue size is :"
100                                    + Services.get().get(CallableQueueService.class).queueSize());
101                }
102                delayedCallables = null;
103                this.delay = 0;
104            }
105        }
106
107        /**
108         * Recover coordinator jobs that should be materialized
109         */
110        private void runCoordJobMatLookup() {
111            XLog.Info.get().clear();
112            XLog LOG = XLog.getLog(getClass());
113            JPAService jpaService = Services.get().get(JPAService.class);
114            try {
115
116                // get current date
117                Date currDate = new Date(new Date().getTime() + lookupInterval * 1000);
118                // get list of all jobs that have actions that should be materialized.
119                int materializationLimit = Services.get().getConf()
120                        .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
121                CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate,
122                        materializationLimit);
123                List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd);
124                LOG.info("CoordMaterializeTriggerService - Curr Date= " + currDate + ", Num jobs to materialize = "
125                        + materializeJobs.size());
126                for (CoordinatorJobBean coordJob : materializeJobs) {
127                    Services.get().get(InstrumentationService.class).get()
128                            .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
129                    int numWaitingActions = jpaService
130                            .execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
131                    LOG.info("Job :" + coordJob.getId() + "  numWaitingActions : " + numWaitingActions
132                            + " MatThrottle : " + coordJob.getMatThrottling());
133                    // update lastModifiedTime so next time others might have higher chance to get pick up
134                    coordJob.setLastModifiedTime(new Date());
135                    jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
136                    if (numWaitingActions >= coordJob.getMatThrottling()) {
137                        LOG.info("info for JobID [" + coordJob.getId() + " already waiting "
138                                + numWaitingActions + " actions. MatThrottle is : " + coordJob.getMatThrottling());
139                        continue;
140                    }
141                    queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
142
143                }
144
145            }
146            catch (JPAExecutorException jex) {
147                LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
148            }
149        }
150
151        /**
152         * Adds callables to a list. If the number of callables in the list reaches {@link
153         * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list
154         * is reset.
155         *
156         * @param callable the callable to queue.
157         */
158        private void queueCallable(XCallable<Void> callable) {
159            if (callables == null) {
160                callables = new ArrayList<XCallable<Void>>();
161            }
162            callables.add(callable);
163            if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
164                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
165                if (ret == false) {
166                    XLog.getLog(getClass()).warn(
167                            "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
168                                    + "Most possibly command queue is full. Queue size is :"
169                                    + Services.get().get(CallableQueueService.class).queueSize());
170                }
171                callables = new ArrayList<XCallable<Void>>();
172            }
173        }
174
175    }
176
177    @Override
178    public void init(Services services) throws ServiceException {
179        Configuration conf = services.getConf();
180        // default is 3600sec (1hr)
181        int materializationWindow = conf.getInt(CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT);
182        // default is 300sec (5min)
183        int lookupInterval = Services.get().getConf().getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT);
184
185        Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval);
186
187        services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, lookupInterval,
188                                                      SchedulerService.Unit.SEC);
189    }
190
191    @Override
192    public void destroy() {
193    }
194
195    @Override
196    public Class<? extends Service> getInterface() {
197        return CoordMaterializeTriggerService.class;
198    }
199
200}