001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.net.URI;
023import java.util.Arrays;
024import java.util.Date;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.CoordinatorActionBean;
028import org.apache.oozie.CoordinatorJobBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.XException;
031import org.apache.oozie.client.CoordinatorAction;
032import org.apache.oozie.client.Job;
033import org.apache.oozie.client.OozieClient;
034import org.apache.oozie.command.CommandException;
035import org.apache.oozie.command.PreconditionException;
036import org.apache.oozie.dependency.DependencyChecker;
037import org.apache.oozie.dependency.ActionDependency;
038import org.apache.oozie.dependency.URIHandler;
039import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
040import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
041import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
042import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
043import org.apache.oozie.executor.jpa.JPAExecutorException;
044import org.apache.oozie.service.CallableQueueService;
045import org.apache.oozie.service.EventHandlerService;
046import org.apache.oozie.service.JPAService;
047import org.apache.oozie.service.RecoveryService;
048import org.apache.oozie.service.Service;
049import org.apache.oozie.service.Services;
050import org.apache.oozie.service.URIHandlerService;
051import org.apache.oozie.util.LogUtils;
052import org.apache.oozie.util.StatusUtils;
053import org.apache.oozie.util.XConfiguration;
054import org.apache.oozie.util.XLog;
055
056public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
057    protected String actionId;
058    protected JPAService jpaService = null;
059    protected CoordinatorActionBean coordAction = null;
060    protected CoordinatorJobBean coordJob = null;
061
062    /**
063     * Property name of command re-queue interval for coordinator push check in
064     * milliseconds.
065     */
066    public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
067            + "coord.push.check.requeue.interval";
068    /**
069     * Default re-queue interval in ms. It is applied when no value defined in
070     * the oozie configuration.
071     */
072    private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
073    private boolean registerForNotification;
074    private boolean removeAvailDependencies;
075
076    public CoordPushDependencyCheckXCommand(String actionId) {
077        this(actionId, false, true);
078    }
079
080    public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
081        this(actionId, registerForNotification, !registerForNotification);
082    }
083
084    public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification,
085            boolean removeAvailDependencies) {
086        super("coord_push_dep_check", "coord_push_dep_check", 0);
087        this.actionId = actionId;
088        this.registerForNotification = registerForNotification;
089        this.removeAvailDependencies = removeAvailDependencies;
090    }
091
092    protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
093        super(actionName, actionName, 0);
094        this.actionId = actionId;
095    }
096
097    @Override
098    protected Void execute() throws CommandException {
099        String pushMissingDeps = coordAction.getPushMissingDependencies();
100        if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
101            LOG.info("Nothing to check. Empty push missing dependency");
102        }
103        else {
104            String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
105            LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
106            LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
107            if (registerForNotification) {
108                LOG.debug("Register for notifications is true");
109            }
110
111            try {
112                Configuration actionConf = null;
113                try {
114                    actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
115                }
116                catch (IOException e) {
117                    throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
118                }
119
120                // Check all dependencies during materialization to avoid registering in the cache.
121                // But check only first missing one afterwards similar to
122                // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
123                ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
124                        !registerForNotification);
125
126                boolean isChangeInDependency = true;
127                boolean timeout = false;
128                if (actionDep.getMissingDependencies().size() == 0) {
129                    // All push-based dependencies are available
130                    onAllPushDependenciesAvailable();
131                }
132                else {
133                    if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
134                        isChangeInDependency = false;
135                    }
136                    else {
137                        String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
138                                .getMissingDependencies());
139                        coordAction.setPushMissingDependencies(stillMissingDeps);
140                    }
141                    // Checking for timeout
142                    timeout = isTimeout();
143                    if (timeout) {
144                        queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
145                    }
146                    else {
147                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
148                                getCoordPushCheckRequeueInterval());
149                    }
150                }
151
152                updateCoordAction(coordAction, isChangeInDependency);
153                if (registerForNotification) {
154                    registerForNotification(actionDep.getMissingDependencies(), actionConf);
155                }
156                if (removeAvailDependencies) {
157                    unregisterAvailableDependencies(actionDep.getAvailableDependencies());
158                }
159                if (timeout) {
160                    unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
161                }
162            }
163            catch (Exception e) {
164                final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
165                if (isTimeout()) {
166                    LOG.debug("Queueing timeout command");
167                    // XCommand.queue() will not work when there is a Exception
168                    callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
169                    unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
170                }
171                else if (coordAction.getMissingDependencies() != null
172                        && coordAction.getMissingDependencies().length() > 0) {
173                    // Queue again on exception as RecoveryService will not queue this again with
174                    // the action being updated regularly by CoordActionInputCheckXCommand
175                    callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
176                            registerForNotification, removeAvailDependencies),
177                            Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
178                }
179                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
180            }
181        }
182        return null;
183    }
184
185    /**
186     * Return the re-queue interval for coord push dependency check
187     * @return
188     */
189    public long getCoordPushCheckRequeueInterval() {
190        long requeueInterval = Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL,
191                DEFAULT_COMMAND_REQUEUE_INTERVAL);
192        return requeueInterval;
193    }
194
195    /**
196     * Returns true if timeout period has been reached
197     *
198     * @return true if it is time for timeout else false
199     */
200    protected boolean isTimeout() {
201        long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
202                .getCreatedTime().getTime()))
203                / (60 * 1000);
204        int timeOut = coordAction.getTimeOut();
205        return (timeOut >= 0) && (waitingTime > timeOut);
206    }
207
208    protected void onAllPushDependenciesAvailable() throws CommandException {
209        coordAction.setPushMissingDependencies("");
210        if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
211            Date nominalTime = coordAction.getNominalTime();
212            Date currentTime = new Date();
213            // The action should become READY only if current time > nominal time;
214            // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
215            if (nominalTime.compareTo(currentTime) > 0) {
216                LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
217                        + currentTime + ", nominal=" + nominalTime);
218            }
219            else {
220                String actionXml = resolveCoordConfiguration();
221                coordAction.setActionXml(actionXml);
222                coordAction.setStatus(CoordinatorAction.Status.READY);
223                // pass jobID to the CoordActionReadyXCommand
224                queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
225            }
226        }
227        else if (isTimeout()) {
228            // If it is timeout and all push dependencies are available but still some unresolved
229            // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to
230            // wait till RecoveryService kicks in
231            queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
232        }
233    }
234
235    private String resolveCoordConfiguration() throws CommandException {
236        try {
237            Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
238            StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
239            String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
240                    actionId);
241            actionXml.replace(0, actionXml.length(), newActionXml);
242            return actionXml.toString();
243        }
244        catch (Exception e) {
245            throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
246        }
247    }
248
249    protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
250            throws CommandException {
251        coordAction.setLastModifiedTime(new Date());
252        if (jpaService != null) {
253            try {
254                if (isChangeInDependency) {
255                    jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
256                    if (EventHandlerService.isEnabled()
257                            && coordAction.getStatus() != CoordinatorAction.Status.READY) {
258                        //since event is not to be generated unless action RUNNING via StartX
259                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
260                    }
261                }
262                else {
263                    jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
264                }
265            }
266            catch (JPAExecutorException jex) {
267                throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
268            }
269        }
270    }
271
272    private void registerForNotification(List<String> missingDeps, Configuration actionConf) {
273        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
274        String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME);
275        for (String missingDep : missingDeps) {
276            try {
277                URI missingURI = new URI(missingDep);
278                URIHandler handler = uriService.getURIHandler(missingURI);
279                handler.registerForNotification(missingURI, actionConf, user, actionId);
280                    LOG.debug("Registered uri [{0}] for notifications", missingURI);
281            }
282            catch (Exception e) {
283                LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e);
284            }
285        }
286    }
287
288    private void unregisterAvailableDependencies(List<String> availableDeps) {
289        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
290        for (String availableDep : availableDeps) {
291            try {
292                URI availableURI = new URI(availableDep);
293                URIHandler handler = uriService.getURIHandler(availableURI);
294                if (handler.unregisterFromNotification(availableURI, actionId)) {
295                    LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI);
296                }
297                else {
298                    LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI);
299                }
300            }
301            catch (Exception e) {
302                LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e);
303            }
304        }
305    }
306
307    public static void unregisterMissingDependencies(List<String> missingDeps, String actionId) {
308        final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class);
309        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
310        for (String missingDep : missingDeps) {
311            try {
312                URI missingURI = new URI(missingDep);
313                URIHandler handler = uriService.getURIHandler(missingURI);
314                if (handler.unregisterFromNotification(missingURI, actionId)) {
315                    LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI);
316                }
317                else {
318                    LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI);
319                }
320            }
321            catch (Exception e) {
322                LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e);
323            }
324        }
325    }
326
327    @Override
328    public String getEntityKey() {
329        return coordAction.getJobId();
330    }
331
332    @Override
333    public String getKey(){
334        return getName() + "_" + actionId;
335    }
336
337    @Override
338    protected boolean isLockRequired() {
339        return true;
340    }
341
342    @Override
343    protected void eagerLoadState() throws CommandException {
344        try {
345            jpaService = Services.get().get(JPAService.class);
346
347            if (jpaService != null) {
348                coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
349                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
350                LogUtils.setLogInfo(coordAction, logInfo);
351            }
352            else {
353                throw new CommandException(ErrorCode.E0610);
354            }
355        }
356        catch (XException ex) {
357            throw new CommandException(ex);
358        }
359    }
360
361    @Override
362    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
363        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
364            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
365                    + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state="
366                    + coordAction.getStatus());
367        }
368
369        // if eligible to do action input check when running with backward
370        // support is true
371        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
372            return;
373        }
374
375        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
376                && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
377            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
378                    + "]::CoordPushDependencyCheck:: Ignoring action."
379                    + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
380                    + coordJob.getStatus());
381        }
382    }
383
384    @Override
385    protected void loadState() throws CommandException {
386        eagerLoadState();
387    }
388
389    @Override
390    protected void verifyPrecondition() throws CommandException, PreconditionException {
391        eagerVerifyPrecondition();
392    }
393
394}