001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.IOException;
021import java.text.MessageFormat;
022import java.util.Properties;
023
024import javax.persistence.EntityManager;
025import javax.persistence.EntityManagerFactory;
026import javax.persistence.Persistence;
027import javax.persistence.PersistenceException;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.oozie.BundleActionBean;
031import org.apache.oozie.BundleJobBean;
032import org.apache.oozie.CoordinatorActionBean;
033import org.apache.oozie.CoordinatorJobBean;
034import org.apache.oozie.ErrorCode;
035import org.apache.oozie.FaultInjection;
036import org.apache.oozie.SLAEventBean;
037import org.apache.oozie.WorkflowActionBean;
038import org.apache.oozie.WorkflowJobBean;
039import org.apache.oozie.client.rest.JsonBundleJob;
040import org.apache.oozie.client.rest.JsonCoordinatorAction;
041import org.apache.oozie.client.rest.JsonCoordinatorJob;
042import org.apache.oozie.client.rest.JsonSLAEvent;
043import org.apache.oozie.client.rest.JsonWorkflowAction;
044import org.apache.oozie.client.rest.JsonWorkflowJob;
045import org.apache.oozie.executor.jpa.JPAExecutor;
046import org.apache.oozie.executor.jpa.JPAExecutorException;
047import org.apache.oozie.sla.SLARegistrationBean;
048import org.apache.oozie.sla.SLASummaryBean;
049import org.apache.oozie.util.IOUtils;
050import org.apache.oozie.util.Instrumentable;
051import org.apache.oozie.util.Instrumentation;
052import org.apache.oozie.util.XLog;
053import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
054
055/**
056 * Service that manages JPA and executes {@link JPAExecutor}.
057 */
058@SuppressWarnings("deprecation")
059public class JPAService implements Service, Instrumentable {
060    private static final String INSTRUMENTATION_GROUP = "jpa";
061
062    public static final String CONF_DB_SCHEMA = "oozie.db.schema.name";
063
064    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JPAService.";
065    public static final String CONF_URL = CONF_PREFIX + "jdbc.url";
066    public static final String CONF_DRIVER = CONF_PREFIX + "jdbc.driver";
067    public static final String CONF_USERNAME = CONF_PREFIX + "jdbc.username";
068    public static final String CONF_PASSWORD = CONF_PREFIX + "jdbc.password";
069    public static final String CONF_CONN_DATA_SOURCE = CONF_PREFIX + "connection.data.source";
070
071    public static final String CONF_MAX_ACTIVE_CONN = CONF_PREFIX + "pool.max.active.conn";
072    public static final String CONF_CREATE_DB_SCHEMA = CONF_PREFIX + "create.db.schema";
073    public static final String CONF_VALIDATE_DB_CONN = CONF_PREFIX + "validate.db.connection";
074    public static final String CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL = CONF_PREFIX + "validate.db.connection.eviction.interval";
075    public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num";
076
077
078    private EntityManagerFactory factory;
079    private Instrumentation instr;
080
081    private static XLog LOG;
082
083    /**
084     * Return the public interface of the service.
085     *
086     * @return {@link JPAService}.
087     */
088    public Class<? extends Service> getInterface() {
089        return JPAService.class;
090    }
091
092    @Override
093    public void instrument(Instrumentation instr) {
094        this.instr = instr;
095    }
096
097    /**
098     * Initializes the {@link JPAService}.
099     *
100     * @param services services instance.
101     */
102    public void init(Services services) throws ServiceException {
103        LOG = XLog.getLog(JPAService.class);
104        Configuration conf = services.getConf();
105        String dbSchema = conf.get(CONF_DB_SCHEMA, "oozie");
106        String url = conf.get(CONF_URL, "jdbc:derby:${oozie.home.dir}/${oozie.db.schema.name}-db;create=true");
107        String driver = conf.get(CONF_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
108        String user = conf.get(CONF_USERNAME, "sa");
109        String password = conf.get(CONF_PASSWORD, "").trim();
110        String maxConn = conf.get(CONF_MAX_ACTIVE_CONN, "10").trim();
111        String dataSource = conf.get(CONF_CONN_DATA_SOURCE, "org.apache.commons.dbcp.BasicDataSource");
112        boolean autoSchemaCreation = conf.getBoolean(CONF_CREATE_DB_SCHEMA, true);
113        boolean validateDbConn = conf.getBoolean(CONF_VALIDATE_DB_CONN, false);
114        String evictionInterval = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_INTERVAL, "300000").trim();
115        String evictionNum = conf.get(CONF_VALIDATE_DB_CONN_EVICTION_NUM, "10").trim();
116
117        if (!url.startsWith("jdbc:")) {
118            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, must start with 'jdbc:'");
119        }
120        String dbType = url.substring("jdbc:".length());
121        if (dbType.indexOf(":") <= 0) {
122            throw new ServiceException(ErrorCode.E0608, url, "invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
123        }
124        dbType = dbType.substring(0, dbType.indexOf(":"));
125
126        String persistentUnit = "oozie-" + dbType;
127
128        // Checking existince of ORM file for DB type
129        String ormFile = "META-INF/" + persistentUnit + "-orm.xml";
130        try {
131            IOUtils.getResourceAsStream(ormFile, -1);
132        }
133        catch (IOException ex) {
134            throw new ServiceException(ErrorCode.E0609, dbType, ormFile);
135        }
136
137        String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
138        connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
139        Properties props = new Properties();
140        if (autoSchemaCreation) {
141            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
142            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
143        }
144        else if (validateDbConn) {
145            // validation can be done only if the schema already exist, else a
146            // connection cannot be obtained to create the schema.
147            String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
148            String num = "numTestsPerEvictionRun=" + evictionNum;
149            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
150            connProps += ",ValidationQuery=select count(*) from VALIDATE_CONN";
151            connProps = MessageFormat.format(connProps, dbSchema);
152        }
153        else {
154            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
155        }
156        props.setProperty("openjpa.ConnectionProperties", connProps);
157
158        props.setProperty("openjpa.ConnectionDriverName", dataSource);
159
160        factory = Persistence.createEntityManagerFactory(persistentUnit, props);
161
162        EntityManager entityManager = getEntityManager();
163        entityManager.find(WorkflowActionBean.class, 1);
164        entityManager.find(WorkflowJobBean.class, 1);
165        entityManager.find(CoordinatorActionBean.class, 1);
166        entityManager.find(CoordinatorJobBean.class, 1);
167        entityManager.find(JsonWorkflowAction.class, 1);
168        entityManager.find(JsonWorkflowJob.class, 1);
169        entityManager.find(JsonCoordinatorAction.class, 1);
170        entityManager.find(JsonCoordinatorJob.class, 1);
171        entityManager.find(SLAEventBean.class, 1);
172        entityManager.find(JsonSLAEvent.class, 1);
173        entityManager.find(BundleJobBean.class, 1);
174        entityManager.find(JsonBundleJob.class, 1);
175        entityManager.find(BundleActionBean.class, 1);
176        entityManager.find(SLARegistrationBean.class, 1);
177        entityManager.find(SLASummaryBean.class, 1);
178
179        LOG.info(XLog.STD, "All entities initialized");
180        // need to use a pseudo no-op transaction so all entities, datasource
181        // and connection pool are initialized one time only
182        entityManager.getTransaction().begin();
183        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) factory;
184        // Mask the password with '***'
185        String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
186        LOG.info("JPA configuration: {0}", logMsg);
187        entityManager.getTransaction().commit();
188        entityManager.close();
189    }
190
191    /**
192     * Destroy the JPAService
193     */
194    public void destroy() {
195        if (factory != null && factory.isOpen()) {
196            factory.close();
197        }
198    }
199
200    /**
201     * Execute a {@link JPAExecutor}.
202     *
203     * @param executor JPAExecutor to execute.
204     * @return return value of the JPAExecutor.
205     * @throws JPAExecutorException thrown if an jpa executor failed
206     */
207    public <T> T execute(JPAExecutor<T> executor) throws JPAExecutorException {
208        EntityManager em = getEntityManager();
209        Instrumentation.Cron cron = new Instrumentation.Cron();
210        try {
211            LOG.trace("Executing JPAExecutor [{0}]", executor.getName());
212            if (instr != null) {
213                instr.incr(INSTRUMENTATION_GROUP, executor.getName(), 1);
214            }
215            cron.start();
216            em.getTransaction().begin();
217            T t = executor.execute(em);
218            if (em.getTransaction().isActive()) {
219                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
220                    throw new RuntimeException("Skipping Commit for Failover Testing");
221                }
222
223                em.getTransaction().commit();
224            }
225            return t;
226        }
227        catch (PersistenceException e) {
228            throw new JPAExecutorException(ErrorCode.E0603, e);
229        }
230        finally {
231            cron.stop();
232            if (instr != null) {
233                instr.addCron(INSTRUMENTATION_GROUP, executor.getName(), cron);
234            }
235            try {
236                if (em.getTransaction().isActive()) {
237                    LOG.warn("JPAExecutor [{0}] ended with an active transaction, rolling back", executor.getName());
238                    em.getTransaction().rollback();
239                }
240            }
241            catch (Exception ex) {
242                LOG.warn("Could not check/rollback transaction after JPAExecutor [{0}], {1}", executor.getName(), ex
243                        .getMessage(), ex);
244            }
245            try {
246                if (em.isOpen()) {
247                    em.close();
248                }
249                else {
250                    LOG.warn("JPAExecutor [{0}] closed the EntityManager, it should not!", executor.getName());
251                }
252            }
253            catch (Exception ex) {
254                LOG.warn("Could not close EntityManager after JPAExecutor [{0}], {1}", executor.getName(), ex
255                        .getMessage(), ex);
256            }
257        }
258    }
259
260    /**
261     * Return an EntityManager. Used by the StoreService. Once the StoreService is removed this method must be removed.
262     *
263     * @return an entity manager
264     */
265    EntityManager getEntityManager() {
266        return factory.createEntityManager();
267    }
268
269}