001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.List;
024
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.XException;
027import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor;
028import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
029import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor;
030import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor;
031import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
032import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
033import org.apache.oozie.executor.jpa.JPAExecutorException;
034import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor;
035import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
036import org.apache.oozie.executor.jpa.WorkflowJobsGetFromParentIdJPAExecutor;
037import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
038import org.apache.oozie.service.JPAService;
039import org.apache.oozie.service.Services;
040
041/**
042 * This class is used to purge workflows, coordinators, and bundles.  It takes into account the relationships between workflows and
043 * coordinators, and coordinators and bundles.  It also only acts on 'limit' number of items at a time to not overtax the DB and in
044 * case something gets rolled back.  Also, children are always deleted before their parents in case of a rollback.
045 */
046public class PurgeXCommand extends XCommand<Void> {
047    private JPAService jpaService = null;
048    private int wfOlderThan;
049    private int coordOlderThan;
050    private int bundleOlderThan;
051    private final int limit;
052    private List<String> wfList;
053    private List<String> coordList;
054    private List<String> bundleList;
055    private int wfDel;
056    private int coordDel;
057    private int bundleDel;
058
059    public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) {
060        super("purge", "purge", 0);
061        this.wfOlderThan = wfOlderThan;
062        this.coordOlderThan = coordOlderThan;
063        this.bundleOlderThan = bundleOlderThan;
064        this.limit = limit;
065        wfList = new ArrayList<String>();
066        coordList = new ArrayList<String>();
067        bundleList = new ArrayList<String>();
068        wfDel = 0;
069        coordDel = 0;
070        bundleDel = 0;
071    }
072
073    /* (non-Javadoc)
074     * @see org.apache.oozie.command.XCommand#loadState()
075     */
076    @Override
077    protected void loadState() throws CommandException {
078        try {
079            jpaService = Services.get().get(JPAService.class);
080
081            if (jpaService != null) {
082                // Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents)
083                int size;
084                do {
085                    size = wfList.size();
086                    wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit)));
087                } while(size != wfList.size());
088                do {
089                    size = coordList.size();
090                    coordList.addAll(jpaService.execute(
091                            new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit)));
092                } while(size != coordList.size());
093                do {
094                    size = bundleList.size();
095                    bundleList.addAll(jpaService.execute(
096                            new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit)));
097                } while(size != bundleList.size());
098            }
099            else {
100                throw new CommandException(ErrorCode.E0610);
101            }
102        }
103        catch (XException ex) {
104            throw new CommandException(ex);
105        }
106    }
107
108    /* (non-Javadoc)
109     * @see org.apache.oozie.command.XCommand#execute()
110     */
111    @Override
112    protected Void execute() throws CommandException {
113        LOG.debug("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
114                + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
115
116        // Process parentless workflows to purge them and their children
117        if (!wfList.isEmpty()) {
118            try {
119                processWorkflows(wfList);
120            }
121            catch (JPAExecutorException je) {
122                throw new CommandException(je);
123            }
124        }
125
126        // Processs parentless coordinators to purge them and their children
127        if (!coordList.isEmpty()) {
128            try {
129                processCoordinators(coordList);
130            }
131            catch (JPAExecutorException je) {
132                throw new CommandException(je);
133            }
134        }
135
136        // Process bundles to purge them and their children
137        if (!bundleList.isEmpty()) {
138            try {
139                processBundles(bundleList);
140            }
141            catch (JPAExecutorException je) {
142                throw new CommandException(je);
143            }
144        }
145
146        LOG.debug("ENDED Purge deleted [{0}] workflows, [{1}] coordinators, [{2}] bundles", wfDel, coordDel, bundleDel);
147        return null;
148    }
149
150    /**
151     * Process workflows to purge them and their children.  Uses the processWorkflowsHelper method to help via recursion to make
152     * sure that the workflow children are deleted before their parents.
153     *
154     * @param wfs List of workflows to process
155     * @throws JPAExecutorException If a JPA executor has a problem
156     */
157    private void processWorkflows(List<String> wfs) throws JPAExecutorException {
158        List<String> wfsToPurge = processWorkflowsHelper(wfs);
159        purgeWorkflows(wfsToPurge);
160    }
161
162    /**
163     * Used by the processWorkflows method and via recursion.
164     *
165     * @param wfs List of workflows to process
166     * @return List of workflows to purge
167     * @throws JPAExecutorException If a JPA executor has a problem
168     */
169    private List<String> processWorkflowsHelper(List<String> wfs) throws JPAExecutorException {
170        // If the list is empty, then we've finished recursing
171        if (wfs.isEmpty()) {
172            return wfs;
173        }
174        List<String> subwfs = new ArrayList<String>();
175        List<String> wfsToPurge = new ArrayList<String>();
176        for (String wfId : wfs) {
177            // We only purge the workflow and its children if they are all ready to be purged
178            long numChildrenNotReady = jpaService.execute(
179                    new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, wfId));
180            if (numChildrenNotReady == 0) {
181                wfsToPurge.add(wfId);
182                // Get all of the direct children for this workflow
183                List<String> children = new ArrayList<String>();
184                int size;
185                do {
186                    size = children.size();
187                    children.addAll(jpaService.execute(new WorkflowJobsGetFromParentIdJPAExecutor(wfId, children.size(), limit)));
188                } while (size != children.size());
189                subwfs.addAll(children);
190            }
191        }
192        // Recurse on the children we just found to process their children
193        wfsToPurge.addAll(processWorkflowsHelper(subwfs));
194        return wfsToPurge;
195    }
196
197    /**
198     * Process coordinators to purge them and their children.
199     *
200     * @param coords List of coordinators to process
201     * @throws JPAExecutorException If a JPA executor has a problem
202     */
203    private void processCoordinators(List<String> coords) throws JPAExecutorException {
204        List<String> wfsToPurge = new ArrayList<String>();
205        List<String> coordsToPurge = new ArrayList<String>();
206        for (String coordId : coords) {
207            // We only purge the coord and its children if they are all ready to be purged
208            long numChildrenNotReady = jpaService.execute(
209                    new WorkflowJobsCountNotForPurgeFromParentIdJPAExecutor(wfOlderThan, coordId));
210            if (numChildrenNotReady == 0) {
211                coordsToPurge.add(coordId);
212                // Get all of the direct children for this coord
213                List<String> children = new ArrayList<String>();
214                int size;
215                do {
216                    size = children.size();
217                    children.addAll(jpaService.execute(
218                            new WorkflowJobsGetFromParentIdJPAExecutor(coordId, children.size(), limit)));
219                } while (size != children.size());
220                wfsToPurge.addAll(children);
221            }
222        }
223        // Process the children
224        processWorkflows(wfsToPurge);
225        // Now that all children have been purged, we can purge the coordinators
226        purgeCoordinators(coordsToPurge);
227    }
228
229    /**
230     * Process bundles to purge them and their children
231     *
232     * @param bundles List of bundles to process
233     * @throws JPAExecutorException If a JPA executor has a problem
234     */
235    private void processBundles(List<String> bundles) throws JPAExecutorException {
236        List<String> coordsToPurge = new ArrayList<String>();
237        List<String> bundlesToPurge = new ArrayList<String>();
238        for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) {
239            String bundleId = it.next();
240            // We only purge the bundle and its children if they are all ready to be purged
241            long numChildrenNotReady = jpaService.execute(
242                    new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
243            if (numChildrenNotReady == 0) {
244                bundlesToPurge.add(bundleId);
245                // Get all of the direct children for this bundle
246                List<String> children = new ArrayList<String>();
247                int size;
248                do {
249                    size = children.size();
250                    children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit)));
251                } while (size != children.size());
252                coordsToPurge.addAll(children);
253            }
254        }
255        // Process the children
256        processCoordinators(coordsToPurge);
257        // Now that all children have been purged, we can purge the bundles
258        purgeBundles(bundlesToPurge);
259    }
260
261    /**
262     * Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are
263     * purged before their parents)
264     *
265     * @param wfs List of workflows to purge
266     * @throws JPAExecutorException If a JPA executor has a problem
267     */
268    private void purgeWorkflows(List<String> wfs) throws JPAExecutorException {
269        wfDel += wfs.size();
270        Collections.reverse(wfs);
271        for (int startIndex = 0; startIndex < wfs.size(); ) {
272            int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size();
273            jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfs.subList(startIndex, endIndex)));
274            startIndex = endIndex;
275        }
276    }
277
278    /**
279     * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience)
280     *
281     * @param coords List of coordinators to purge
282     * @throws JPAExecutorException If a JPA executor has a problem
283     */
284    private void purgeCoordinators(List<String> coords) throws JPAExecutorException {
285        coordDel += coords.size();
286        for (int startIndex = 0; startIndex < coords.size(); ) {
287            int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size();
288            jpaService.execute(new CoordJobsDeleteJPAExecutor(coords.subList(startIndex, endIndex)));
289            startIndex = endIndex;
290        }
291    }
292
293    /**
294     * Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience)
295     *
296     * @param bundles List of bundles to purge
297     * @throws JPAExecutorException If a JPA executor has a problem
298     */
299    private void purgeBundles(List<String> bundles) throws JPAExecutorException {
300        bundleDel += bundles.size();
301        for (int startIndex = 0; startIndex < bundles.size(); ) {
302            int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size();
303            jpaService.execute(new BundleJobsDeleteJPAExecutor(bundles.subList(startIndex, endIndex)));
304            startIndex = endIndex;
305        }
306    }
307
308    /* (non-Javadoc)
309     * @see org.apache.oozie.command.XCommand#getEntityKey()
310     */
311    @Override
312    public String getEntityKey() {
313        return null;
314    }
315
316    /* (non-Javadoc)
317     * @see org.apache.oozie.command.XCommand#isLockRequired()
318     */
319    @Override
320    protected boolean isLockRequired() {
321        return false;
322    }
323
324    /* (non-Javadoc)
325     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
326     */
327    @Override
328    protected void verifyPrecondition() throws CommandException, PreconditionException {
329    }
330}