001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.sql.Timestamp;
021import java.text.ParseException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029
030import javax.persistence.EntityManager;
031import javax.persistence.Query;
032
033import org.apache.oozie.BundleJobBean;
034import org.apache.oozie.client.BundleJob;
035import org.apache.oozie.client.CoordinatorJob;
036import org.apache.oozie.client.rest.BulkResponseImpl;
037import org.apache.oozie.BulkResponseInfo;
038import org.apache.oozie.CoordinatorActionBean;
039import org.apache.oozie.CoordinatorJobBean;
040import org.apache.oozie.ErrorCode;
041import org.apache.oozie.client.CoordinatorAction;
042import org.apache.oozie.util.DateUtils;
043import org.apache.oozie.util.ParamChecker;
044
045/**
046 * The query executor class for bulk monitoring queries i.e. debugging bundle ->
047 * coord actions directly
048 */
049public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> {
050    private Map<String, List<String>> bulkFilter;
051    // defaults
052    private int start = 1;
053    private int len = 50;
054
055    public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) {
056        ParamChecker.notNull(bulkFilter, "bulkFilter");
057        this.bulkFilter = bulkFilter;
058        this.start = start;
059        this.len = len;
060    }
061
062    /*
063     * (non-Javadoc)
064     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
065     */
066    @Override
067    public String getName() {
068        return "BulkJPAExecutor";
069    }
070
071    /*
072     * (non-Javadoc)
073     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
074     */
075    @Override
076    public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException {
077        List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>();
078        Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>();
079
080        try {
081            // Lightweight Query 1 on Bundle level to fetch the bundle job
082            // corresponding to name
083            BundleJobBean bundleBean = bundleQuery(em);
084
085            // Join query between coordinator job and coordinator action tables
086            // to get entries for specific bundleId only
087            String conditions = actionQuery(em, bundleBean, actionTimes, responseList);
088
089            // Query to get the count of records
090            long total = countQuery(conditions, em, bundleBean, actionTimes);
091
092            BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total);
093            return bulk;
094        }
095        catch (Exception e) {
096            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
097        }
098    }
099
100    @SuppressWarnings("unchecked")
101    private BundleJobBean bundleQuery(EntityManager em) throws JPAExecutorException {
102        BundleJobBean bundleBean = new BundleJobBean();
103        String bundleName = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME).get(0);
104        Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY");
105        q.setParameter("appName", bundleName);
106        List<Object[]> bundles = (List<Object[]>) q.getResultList();
107        if (bundles.isEmpty()) {
108            throw new JPAExecutorException(ErrorCode.E0603, "No bundle entries found for bundle name: "
109                    + bundleName);
110        }
111        if (bundles.size() > 1) { // more than one bundles running with same
112                                  // name - ERROR. Fail fast
113            throw new JPAExecutorException(ErrorCode.E0603, "Non-unique bundles present for same bundle name: "
114                    + bundleName);
115        }
116        bundleBean = getBeanForBundleJob(bundles.get(0), bundleName);
117        return bundleBean;
118    }
119
120    @SuppressWarnings("unchecked")
121    private String actionQuery(EntityManager em, BundleJobBean bundleBean,
122            Map<String, Timestamp> times, List<BulkResponseImpl> responseList) throws ParseException {
123        Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY");
124        StringBuilder getActions = new StringBuilder(q.toString());
125        StringBuilder conditionClause = new StringBuilder();
126        conditionClause.append(coordNamesClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD_NAME)));
127        conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS)));
128        int offset = getActions.indexOf("ORDER");
129        getActions.insert(offset - 1, conditionClause);
130        timesClause(getActions, offset, times);
131        q = em.createQuery(getActions.toString());
132        Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
133        while (iter.hasNext()) {
134            Entry<String, Timestamp> time = iter.next();
135            q.setParameter(time.getKey(), time.getValue());
136        }
137        q.setParameter("bundleId", bundleBean.getId());
138        // pagination
139        q.setFirstResult(start - 1);
140        q.setMaxResults(len);
141
142        List<Object[]> response = q.getResultList();
143        for (Object[] r : response) {
144            BulkResponseImpl br = getResponseFromObject(bundleBean, r);
145            responseList.add(br);
146        }
147        return q.toString();
148    }
149
150    private long countQuery(String clause, EntityManager em, BundleJobBean bundleBean, Map<String, Timestamp> times) {
151        Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY");
152        StringBuilder getTotal = new StringBuilder(q.toString() + " ");
153        getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER")));
154        q = em.createQuery(getTotal.toString());
155        q.setParameter("bundleId", bundleBean.getId());
156        Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
157        while (iter.hasNext()) {
158            Entry<String, Timestamp> time = iter.next();
159            q.setParameter(time.getKey(), time.getValue());
160        }
161        long total = ((Long) q.getSingleResult()).longValue();
162        return total;
163    }
164
165    // Form the where clause to filter by coordinator names
166    private StringBuilder coordNamesClause(List<String> coordNames) {
167        StringBuilder sb = new StringBuilder();
168        boolean firstVal = true;
169        for (String name : nullToEmpty(coordNames)) {
170            if (firstVal) {
171                sb.append(" AND c.appName IN (\'" + name + "\'");
172                firstVal = false;
173            }
174            else {
175                sb.append(",\'" + name + "\'");
176            }
177        }
178        if (!firstVal) {
179            sb.append(") ");
180        }
181        return sb;
182    }
183
184    // Form the where clause to filter by coord action status
185    private StringBuilder statusClause(List<String> statuses) {
186        StringBuilder sb = new StringBuilder();
187        boolean firstVal = true;
188        for (String status : nullToEmpty(statuses)) {
189            if (firstVal) {
190                sb.append(" AND a.status IN (\'" + status + "\'");
191                firstVal = false;
192            }
193            else {
194                sb.append(",\'" + status + "\'");
195            }
196        }
197        if (!firstVal) {
198            sb.append(") ");
199        }
200        else { // statuses was null. adding default
201            sb.append(" AND a.status IN ('KILLED', 'FAILED') ");
202        }
203        return sb;
204    }
205
206    private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException {
207        Timestamp ts = null;
208        List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
209        if (times != null) {
210            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
211            sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated");
212            eachTime.put("startCreated", ts);
213        }
214        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
215        if (times != null) {
216            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
217            sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated");
218            eachTime.put("endCreated", ts);
219        }
220        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
221        if (times != null) {
222            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
223            sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal");
224            eachTime.put("startNominal", ts);
225        }
226        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
227        if (times != null) {
228            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
229            sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal");
230            eachTime.put("endNominal", ts);
231        }
232    }
233
234    private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) {
235        BulkResponseImpl bean = new BulkResponseImpl();
236        CoordinatorJobBean coordBean = new CoordinatorJobBean();
237        CoordinatorActionBean actionBean = new CoordinatorActionBean();
238        if (arr[0] != null) {
239            actionBean.setId((String) arr[0]);
240        }
241        if (arr[1] != null) {
242            actionBean.setActionNumber((Integer) arr[1]);
243        }
244        if (arr[2] != null) {
245            actionBean.setErrorCode((String) arr[2]);
246        }
247        if (arr[3] != null) {
248            actionBean.setErrorMessage((String) arr[3]);
249        }
250        if (arr[4] != null) {
251            actionBean.setExternalId((String) arr[4]);
252        }
253        if (arr[5] != null) {
254            actionBean.setExternalStatus((String) arr[5]);
255        }
256        if (arr[6] != null) {
257            actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6]));
258        }
259        if (arr[7] != null) {
260            actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7]));
261        }
262        if (arr[8] != null) {
263            actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8]));
264        }
265        if (arr[9] != null) {
266            actionBean.setMissingDependencies((String) arr[9]);
267        }
268        if (arr[10] != null) {
269            coordBean.setId((String) arr[10]);
270            actionBean.setJobId((String) arr[10]);
271        }
272        if (arr[11] != null) {
273            coordBean.setAppName((String) arr[11]);
274        }
275        if (arr[12] != null) {
276            coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12]));
277        }
278        bean.setBundle(bundleBean);
279        bean.setCoordinator(coordBean);
280        bean.setAction(actionBean);
281        return bean;
282    }
283
284    private BundleJobBean getBeanForBundleJob(Object[] barr, String name) throws JPAExecutorException {
285        BundleJobBean bean = new BundleJobBean();
286        if (barr[0] != null) {
287            bean.setId((String) barr[0]);
288        }
289        else {
290            throw new JPAExecutorException(ErrorCode.E0603,
291                    "bundleId returned by query is null - cannot retrieve bulk results");
292        }
293        bean.setAppName(name);
294        if (barr[1] != null) {
295            bean.setStatus(BundleJob.Status.valueOf((String) barr[1]));
296        }
297        if (barr[2] != null) {
298            bean.setUser((String) barr[2]);
299        }
300        return bean;
301    }
302
303    // null safeguard
304    public static List<String> nullToEmpty(List<String> input) {
305        return input == null ? Collections.<String> emptyList() : input;
306    }
307
308}