001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.util.Collection;
021import java.util.Date;
022
023import javax.persistence.EntityManager;
024import javax.persistence.Query;
025
026import org.apache.oozie.CoordinatorActionBean;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.FaultInjection;
029import org.apache.oozie.client.rest.JsonBean;
030import org.apache.oozie.util.ParamChecker;
031
032/**
033 * Class for inserting and updating beans in bulk
034 *
035 */
036public class BulkUpdateInsertForCoordActionStatusJPAExecutor implements JPAExecutor<Void> {
037
038    private Collection<JsonBean> updateList;
039    private Collection<JsonBean> insertList;
040
041    /**
042     * Initialize the JPAExecutor using the update and insert list of JSON beans
043     *
044     * @param updateList
045     */
046    public BulkUpdateInsertForCoordActionStatusJPAExecutor(Collection<JsonBean> updateList,
047            Collection<JsonBean> insertList) {
048        this.updateList = updateList;
049        this.insertList = insertList;
050    }
051
052    public BulkUpdateInsertForCoordActionStatusJPAExecutor() {
053    }
054
055    /**
056     * Sets the update list for JSON bean
057     *
058     * @param updateList
059     */
060    public void setUpdateList(Collection<JsonBean> updateList) {
061        this.updateList = updateList;
062    }
063
064    /**
065     * Sets the insert list for JSON bean
066     *
067     * @param insertList
068     */
069    public void setInsertList(Collection<JsonBean> insertList) {
070        this.insertList = insertList;
071    }
072
073    /*
074     * (non-Javadoc)
075     *
076     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
077     */
078    @Override
079    public String getName() {
080        return "BulkUpdateInsertForCoordActionStatusJPAExecutor";
081    }
082
083    /*
084     * (non-Javadoc)
085     *
086     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
087     * EntityManager)
088     */
089    @Override
090    public Void execute(EntityManager em) throws JPAExecutorException {
091        try {
092            if (insertList != null) {
093                for (JsonBean entity : insertList) {
094                    ParamChecker.notNull(entity, "JsonBean");
095                    em.persist(entity);
096                }
097            }
098            // Only used by test cases to check for rollback of transaction
099            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
100            if (updateList != null) {
101                for (JsonBean entity : updateList) {
102                    ParamChecker.notNull(entity, "JsonBean");
103                    if (entity instanceof CoordinatorActionBean) {
104                        CoordinatorActionBean action = (CoordinatorActionBean) entity;
105                        Query q = em.createNamedQuery("UPDATE_COORD_ACTION_STATUS_PENDING_TIME");
106                        q.setParameter("id", action.getId());
107                        q.setParameter("status", action.getStatus().toString());
108                        q.setParameter("pending", action.getPending());
109                        q.setParameter("lastModifiedTime", new Date());
110                        q.executeUpdate();
111                    }
112                    else {
113                        em.merge(entity);
114                    }
115                }
116            }
117            // Since the return type is Void, we have to return null
118            return null;
119        }
120        catch (Exception e) {
121            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
122        }
123    }
124}