001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import org.apache.commons.logging.LogFactory;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.util.ReflectionUtils;
023import org.apache.hadoop.util.VersionInfo;
024import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
025import org.apache.oozie.util.DateUtils;
026import org.apache.oozie.util.XLog;
027import org.apache.oozie.util.Instrumentable;
028import org.apache.oozie.util.IOUtils;
029import org.apache.oozie.ErrorCode;
030
031import java.util.ArrayList;
032import java.util.Collections;
033import java.util.LinkedHashMap;
034import java.util.List;
035import java.util.Map;
036import java.io.IOException;
037import java.io.File;
038
039/**
040 * Services is a singleton that manages the lifecycle of all registered {@link Services}. <p/> It has 2 built in
041 * services: {@link XLogService} and {@link ConfigurationService}. <p/> The rest of the services are loaded from the
042 * {@link #CONF_SERVICE_CLASSES} configuration property. The services class names must be separated by commas (spaces
043 * and enters are allowed). <p/> The {@link #CONF_SYSTEM_MODE} configuration property is any of
044 * NORMAL/SAFEMODE/NOWEBSERVICE. <p/> Services are loaded and initialized in the order they are defined in the in
045 * configuration property. <p/> After all services are initialized, if the Instrumentation service is present, all
046 * services that implement the {@link Instrumentable} are instrumented. <p/> Services are destroyed in reverse order.
047 * <p/> If services initialization fail, initialized services are immediatly destroyed.
048 */
049public class Services {
050    private static final int MAX_SYSTEM_ID_LEN = 10;
051
052    /**
053     * Environment variable that indicates the location of the Oozie home directory.
054     * The Oozie home directory is used to pick up the conf/ directory from
055     */
056    public static final String OOZIE_HOME_DIR = "oozie.home.dir";
057
058    public static final String CONF_SYSTEM_ID = "oozie.system.id";
059
060    public static final String CONF_SERVICE_CLASSES = "oozie.services";
061
062    public static final String CONF_SERVICE_EXT_CLASSES = "oozie.services.ext";
063
064    public static final String CONF_SYSTEM_MODE = "oozie.systemmode";
065
066    public static final String CONF_DELETE_RUNTIME_DIR = "oozie.delete.runtime.dir.on.shutdown";
067
068    private static Services SERVICES;
069
070    private SYSTEM_MODE systemMode;
071    private String runtimeDir;
072    private Configuration conf;
073    private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>();
074    private String systemId;
075    private static String oozieHome;
076
077    public static void setOozieHome() throws ServiceException {
078        oozieHome = System.getProperty(OOZIE_HOME_DIR);
079        if (oozieHome == null) {
080            throw new ServiceException(ErrorCode.E0000);
081        }
082        if (!oozieHome.startsWith("/")) {
083            throw new ServiceException(ErrorCode.E0003, oozieHome);
084        }
085        File file = new File(oozieHome);
086        if (!file.exists()) {
087            throw new ServiceException(ErrorCode.E0004, oozieHome);
088        }
089    }
090
091    public static String getOozieHome() throws ServiceException {
092        return oozieHome;
093    }
094
095    /**
096     * Create a services. <p/> The built in services are initialized.
097     *
098     * @throws ServiceException thrown if any of the built in services could not initialize.
099     */
100    public Services() throws ServiceException {
101        setOozieHome();
102        if (SERVICES != null) {
103            XLog log = XLog.getLog(getClass());
104            log.warn(XLog.OPS, "Previous services singleton active, destroying it");
105            SERVICES.destroy();
106            SERVICES = null;
107        }
108        setServiceInternal(XLogService.class, false);
109        setServiceInternal(ConfigurationService.class, true);
110        conf = get(ConfigurationService.class).getConf();
111        DateUtils.setConf(conf);
112        if (!DateUtils.getOozieProcessingTimeZone().equals(DateUtils.UTC)) {
113            XLog.getLog(getClass()).warn("Oozie configured to work in a timezone other than UTC: {0}",
114                                         DateUtils.getOozieProcessingTimeZone().getID());
115        }
116        systemId = conf.get(CONF_SYSTEM_ID, ("oozie-" + System.getProperty("user.name")));
117        if (systemId.length() > MAX_SYSTEM_ID_LEN) {
118            systemId = systemId.substring(0, MAX_SYSTEM_ID_LEN);
119            XLog.getLog(getClass()).warn("System ID [{0}] exceeds maximum length [{1}], trimming", systemId,
120                                         MAX_SYSTEM_ID_LEN);
121        }
122        setSystemMode(SYSTEM_MODE.valueOf(conf.get(CONF_SYSTEM_MODE, SYSTEM_MODE.NORMAL.toString())));
123        runtimeDir = createRuntimeDir();
124    }
125
126    private String createRuntimeDir() throws ServiceException {
127        try {
128            File file = File.createTempFile(getSystemId(), ".dir");
129            file.delete();
130            if (!file.mkdir()) {
131                ServiceException ex = new ServiceException(ErrorCode.E0001, file.getAbsolutePath());
132                XLog.getLog(getClass()).fatal(ex);
133                throw ex;
134            }
135            XLog.getLog(getClass()).info("Initialized runtime directory [{0}]", file.getAbsolutePath());
136            return file.getAbsolutePath();
137        }
138        catch (IOException ex) {
139            ServiceException sex = new ServiceException(ErrorCode.E0001, "", ex);
140            XLog.getLog(getClass()).fatal(ex);
141            throw sex;
142        }
143    }
144
145    /**
146     * Return active system mode. <p/> .
147     *
148     * @return
149     */
150
151    public SYSTEM_MODE getSystemMode() {
152        return systemMode;
153    }
154
155    /**
156     * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
157     * new directory per Services initialization.
158     *
159     * @return the runtime directory of the Oozie instance.
160     */
161    public String getRuntimeDir() {
162        return runtimeDir;
163    }
164
165    /**
166     * Return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
167     *
168     * @return the system ID, the value defined in the {@link #CONF_SYSTEM_ID} configuration property.
169     */
170    public String getSystemId() {
171        return systemId;
172    }
173
174    /**
175     * Set and set system mode.
176     *
177     * @param sysMode system mode
178     */
179
180    public synchronized void setSystemMode(SYSTEM_MODE sysMode) {
181        if (this.systemMode != sysMode) {
182            XLog log = XLog.getLog(getClass());
183            log.info(XLog.OPS, "Exiting " + this.systemMode + " Entering " + sysMode);
184        }
185        this.systemMode = sysMode;
186    }
187
188    /**
189     * Return the services configuration.
190     *
191     * @return services configuraiton.
192     */
193    public Configuration getConf() {
194        return conf;
195    }
196
197    /**
198     * Initialize all services define in the {@link #CONF_SERVICE_CLASSES} configuration property.
199     *
200     * @throws ServiceException thrown if any of the services could not initialize.
201     */
202    @SuppressWarnings("unchecked")
203    public void init() throws ServiceException {
204        XLog log = new XLog(LogFactory.getLog(getClass()));
205        log.trace("Initializing");
206        SERVICES = this;
207        try {
208            loadServices();
209        }
210        catch (RuntimeException rex) {
211            XLog.getLog(getClass()).fatal(rex.getMessage(), rex);
212            throw rex;
213        }
214        catch (ServiceException ex) {
215            XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
216            SERVICES = null;
217            throw ex;
218        }
219        InstrumentationService instrService = get(InstrumentationService.class);
220        if (instrService != null) {
221            for (Service service : services.values()) {
222                if (service instanceof Instrumentable) {
223                    ((Instrumentable) service).instrument(instrService.get());
224                }
225            }
226        }
227        log.info("Initialized");
228        log.info("Running with JARs for Hadoop version [{0}]", VersionInfo.getVersion());
229        log.info("Oozie System ID [{0}] started!", getSystemId());
230    }
231
232    /**
233     * Loads the specified services.
234     *
235     * @param classes services classes to load.
236     * @param list    list of loaded service in order of appearance in the
237     *                configuration.
238     * @throws ServiceException thrown if a service class could not be loaded.
239     */
240    private void loadServices(Class[] classes, List<Service> list) throws ServiceException {
241        XLog log = new XLog(LogFactory.getLog(getClass()));
242        for (Class klass : classes) {
243            try {
244                Service service = (Service) klass.newInstance();
245                log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(),
246                        service.getClass());
247                if (!service.getInterface().isInstance(service)) {
248                    throw new ServiceException(ErrorCode.E0101, klass, service.getInterface().getName());
249                }
250                list.add(service);
251            } catch (ServiceException ex) {
252                throw ex;
253            } catch (Exception ex) {
254                throw new ServiceException(ErrorCode.E0102, klass, ex.getMessage(), ex);
255            }
256        }
257    }
258
259    /**
260     * Loads services defined in <code>services</code> and
261     * <code>services.ext</code> and de-dups them.
262     *
263     * @return List of final services to initialize.
264     * @throws ServiceException throw if the services could not be loaded.
265     */
266    private void loadServices() throws ServiceException {
267        XLog log = new XLog(LogFactory.getLog(getClass()));
268        try {
269            Map<Class, Service> map = new LinkedHashMap<Class, Service>();
270            Class[] classes = conf.getClasses(CONF_SERVICE_CLASSES);
271            log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'");
272            Class[] classesExt = conf.getClasses(CONF_SERVICE_EXT_CLASSES);
273            log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'");
274            List<Service> list = new ArrayList<Service>();
275            loadServices(classes, list);
276            loadServices(classesExt, list);
277
278            //removing duplicate services, strategy: last one wins
279            for (Service service : list) {
280                if (map.containsKey(service.getInterface())) {
281                    log.debug("Replacing service [{0}] implementation [{1}]", service.getInterface(),
282                            service.getClass());
283                }
284                map.put(service.getInterface(), service);
285            }
286            for (Map.Entry<Class, Service> entry : map.entrySet()) {
287                setService(entry.getValue().getClass());
288            }
289        } catch (RuntimeException rex) {
290            log.fatal("Runtime Exception during Services Load. Check your list of '" + CONF_SERVICE_CLASSES + "' or '" + CONF_SERVICE_EXT_CLASSES + "'");
291            throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex);
292        }
293    }
294
295    /**
296     * Destroy all services.
297     */
298    public void destroy() {
299        XLog log = new XLog(LogFactory.getLog(getClass()));
300        log.trace("Shutting down");
301        boolean deleteRuntimeDir = false;
302        if (conf != null) {
303            deleteRuntimeDir = conf.getBoolean(CONF_DELETE_RUNTIME_DIR, false);
304        }
305        if (services != null) {
306            List<Service> list = new ArrayList<Service>(services.values());
307            Collections.reverse(list);
308            for (Service service : list) {
309                try {
310                    log.trace("Destroying service[{0}]", service.getInterface());
311                    if (service.getInterface() == XLogService.class) {
312                        log.info("Shutdown");
313                    }
314                    service.destroy();
315                }
316                catch (Throwable ex) {
317                    log.error("Error destroying service[{0}], {1}", service.getInterface(), ex.getMessage(), ex);
318                }
319            }
320        }
321        if (deleteRuntimeDir) {
322            try {
323                IOUtils.delete(new File(runtimeDir));
324            }
325            catch (IOException ex) {
326                log.error("Error deleting runtime directory [{0}], {1}", runtimeDir, ex.getMessage(), ex);
327            }
328        }
329        services = null;
330        conf = null;
331        SERVICES = null;
332    }
333
334    /**
335     * Return a service by its public interface.
336     *
337     * @param serviceKlass service public interface.
338     * @return the associated service, or <code>null</code> if not define.
339     */
340    @SuppressWarnings("unchecked")
341    public <T extends Service> T get(Class<T> serviceKlass) {
342        return (T) services.get(serviceKlass);
343    }
344
345    /**
346     * Set a service programmatically. <p/> The service will be initialized by the services. <p/> If a service is
347     * already defined with the same public interface it will be destroyed.
348     *
349     * @param klass service klass
350     * @throws ServiceException if the service could not be initialized, at this point all services have been
351     * destroyed.
352     */
353    public void setService(Class<? extends Service> klass) throws ServiceException {
354        setServiceInternal(klass, true);
355    }
356
357    private void setServiceInternal(Class<? extends Service> klass, boolean logging) throws ServiceException {
358        try {
359            Service newService = (Service) ReflectionUtils.newInstance(klass, null);
360            Service oldService = services.get(newService.getInterface());
361            if (oldService != null) {
362                oldService.destroy();
363            }
364            if (logging) {
365                XLog log = new XLog(LogFactory.getLog(getClass()));
366                log.trace("Initializing service[{0}] class[{1}]", newService.getInterface(), newService.getClass());
367            }
368            newService.init(this);
369            services.put(newService.getInterface(), newService);
370        }
371        catch (ServiceException ex) {
372            XLog.getLog(getClass()).fatal(ex.getMessage(), ex);
373            destroy();
374            throw ex;
375        }
376    }
377
378    /**
379     * Return the services singleton.
380     *
381     * @return services singleton, <code>null</code> if not initialized.
382     */
383    public static Services get() {
384        return SERVICES;
385    }
386
387}