001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.dependency.hcat;
019
020import java.net.URL;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029
030import net.sf.ehcache.Cache;
031import net.sf.ehcache.CacheException;
032import net.sf.ehcache.CacheManager;
033import net.sf.ehcache.Ehcache;
034import net.sf.ehcache.Element;
035import net.sf.ehcache.config.CacheConfiguration;
036import net.sf.ehcache.event.CacheEventListener;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.oozie.service.HCatAccessorService;
040import org.apache.oozie.service.PartitionDependencyManagerService;
041import org.apache.oozie.service.Services;
042import org.apache.oozie.util.HCatURI;
043import org.apache.oozie.util.XLog;
044
045public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener {
046
047    private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class);
048    private static String TABLE_DELIMITER = "#";
049    private static String PARTITION_DELIMITER = ";";
050
051    public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name";
052
053    private CacheManager cacheManager;
054
055    /**
056     * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of
057     * WaitingAction) which is Serializable (for overflowToDisk)
058     */
059    private ConcurrentMap<String, Cache> missingDepsByServer;
060
061    private CacheConfiguration cacheConfig;
062    /**
063     * Map of server#db#table - sorted part key pattern - count of different partition values (count
064     * of elements in the cache) still missing for a partition key pattern. This count is used to
065     * quickly determine if there are any more missing dependencies for a table. When the count
066     * becomes 0, we unregister from notifications as there are no more missing dependencies for
067     * that table.
068     */
069    private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns;
070    /**
071     * Map of actionIDs and collection of available URIs
072     */
073    private ConcurrentMap<String, Collection<String>> availableDeps;
074
075    @Override
076    public void init(Configuration conf) {
077        String cacheName = conf.get(CONF_CACHE_NAME);
078        URL cacheConfigURL;
079        if (cacheName == null) {
080            cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml");
081            cacheName = "dependency-default";
082        }
083        else {
084            cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml");
085        }
086        if (cacheConfigURL == null) {
087            throw new IllegalStateException("ehcache.xml is not found in classpath");
088        }
089        cacheManager = CacheManager.newInstance(cacheConfigURL);
090        final Cache specifiedCache = cacheManager.getCache(cacheName);
091        if (specifiedCache == null) {
092            throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME
093                    + " is not found");
094        }
095        cacheConfig = specifiedCache.getCacheConfiguration();
096        missingDepsByServer = new ConcurrentHashMap<String, Cache>();
097        partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>();
098        availableDeps = new ConcurrentHashMap<String, Collection<String>>();
099    }
100
101    @Override
102    public void addMissingDependency(HCatURI hcatURI, String actionID) {
103
104        // Create cache for the server if we don't have one
105        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
106        if (missingCache == null) {
107            CacheConfiguration clonedConfig = cacheConfig.clone();
108            clonedConfig.setName(hcatURI.getServer());
109            missingCache = new Cache(clonedConfig);
110            Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache);
111            if (exists == null) {
112                cacheManager.addCache(missingCache);
113                missingCache.getCacheEventNotificationService().registerListener(this);
114            }
115            else {
116                missingCache.dispose(); //discard
117            }
118        }
119
120        // Add hcat uri into the missingCache
121        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
122        String partKeys = sortedPKV.getPartKeys();
123        String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
124                + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
125        boolean newlyAdded = true;
126        synchronized (missingCache) {
127            Element element = missingCache.get(missingKey);
128            if (element == null) {
129                WaitingActions waitingActions = new WaitingActions();
130                element = new Element(missingKey, waitingActions);
131                Element exists = missingCache.putIfAbsent(element);
132                if (exists != null) {
133                    newlyAdded = false;
134                    waitingActions = (WaitingActions) exists.getObjectValue();
135                }
136                waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
137            }
138            else {
139                newlyAdded = false;
140                WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
141                waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
142            }
143        }
144
145        // Increment count for the partition key pattern
146        if (newlyAdded) {
147            String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
148                    + hcatURI.getTable();
149            synchronized (partKeyPatterns) {
150                ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
151                if (patternCounts == null) {
152                    patternCounts = new ConcurrentHashMap<String, SettableInteger>();
153                    partKeyPatterns.put(tableKey, patternCounts);
154                }
155                SettableInteger count = patternCounts.get(partKeys);
156                if (count == null) {
157                    patternCounts.put(partKeys, new SettableInteger(1));
158                }
159                else {
160                    count.increment();
161                }
162            }
163        }
164    }
165
166    @Override
167    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
168
169        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
170        if (missingCache == null) {
171            LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}",
172                    hcatURI.toURIString(), actionID);
173            return false;
174        }
175        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
176        String partKeys = sortedPKV.getPartKeys();
177        String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER +
178                partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
179        boolean decrement = false;
180        boolean removed = false;
181        synchronized (missingCache) {
182            Element element = missingCache.get(missingKey);
183            if (element == null) {
184                LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
185                        hcatURI.toURIString(), actionID);
186                return false;
187            }
188            Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
189            removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
190            if (!removed) {
191                LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
192                        hcatURI.toURIString(), actionID);
193            }
194            if (waitingActions.isEmpty()) {
195                missingCache.remove(missingKey);
196                decrement = true;
197            }
198        }
199        // Decrement partition key pattern count if the cache entry is removed
200        if (decrement) {
201            String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
202                    + hcatURI.getTable();
203            decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString());
204        }
205        return removed;
206    }
207
208    @Override
209    public Collection<String> getWaitingActions(HCatURI hcatURI) {
210        Collection<String> actionIDs = null;
211        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
212        if (missingCache != null) {
213            SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
214            String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
215                    + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals();
216            Element element = missingCache.get(missingKey);
217            if (element != null) {
218                WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
219                actionIDs = new ArrayList<String>();
220                String uriString = hcatURI.getURI().toString();
221                for (WaitingAction action : waitingActions.getWaitingActions()) {
222                    if (action.getDependencyURI().equals(uriString)) {
223                        actionIDs.add(action.getActionID());
224                    }
225                }
226            }
227        }
228        return actionIDs;
229    }
230
231    @Override
232    public Collection<String> markDependencyAvailable(String server, String db, String table,
233            Map<String, String> partitions) {
234        String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
235        synchronized (partKeyPatterns) {
236            Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
237            if (patternCounts == null) {
238                LOG.warn("Got partition available notification for " + tableKey
239                        + ". Unexpected as no matching partition keys. Unregistering topic");
240                unregisterFromNotifications(server, db, table);
241                return null;
242            }
243            Cache missingCache = missingDepsByServer.get(server);
244            if (missingCache == null) {
245                LOG.warn("Got partition available notification for " + tableKey
246                        + ". Unexpected. Missing server entry in cache. Unregistering topic");
247                partKeyPatterns.remove(tableKey);
248                unregisterFromNotifications(server, db, table);
249                return null;
250            }
251            Collection<String> actionsWithAvailDep = new HashSet<String>();
252            StringBuilder partValSB = new StringBuilder();
253            // If partition patterns are date, date;country and date;country;state,
254            // construct the partition values for each pattern and for the matching value in the
255            // missingCache, get the waiting actions and mark it as available.
256            for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
257                String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
258                partValSB.setLength(0);
259                for (String key : partKeys) {
260                    partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
261                }
262                partValSB.setLength(partValSB.length() - 1);
263                String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER
264                        + partValSB.toString();
265                boolean removed = false;
266                Element element = null;
267                synchronized (missingCache) {
268                    element = missingCache.get(missingKey);
269                    if (element != null) {
270                        missingCache.remove(missingKey);
271                        removed = true;
272                    }
273                }
274                if (removed) {
275                    decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey);
276                    // Add the removed entry to available dependencies
277                    Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue())
278                            .getWaitingActions();
279                    for (WaitingAction wAction : wActions) {
280                        String actionID = wAction.getActionID();
281                        actionsWithAvailDep.add(actionID);
282                        Collection<String> depURIs = availableDeps.get(actionID);
283                        if (depURIs == null) {
284                            depURIs = new ArrayList<String>();
285                            Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
286                            if (existing != null) {
287                                depURIs = existing;
288                            }
289                        }
290                        synchronized (depURIs) {
291                            depURIs.add(wAction.getDependencyURI());
292                            availableDeps.put(actionID, depURIs);
293                        }
294                    }
295                }
296            }
297            return actionsWithAvailDep;
298        }
299    }
300
301    @Override
302    public Collection<String> getAvailableDependencyURIs(String actionID) {
303        Collection<String> available = availableDeps.get(actionID);
304        if (available !=  null) {
305            // Return a copy
306            available = new ArrayList<String>(available);
307        }
308        return available;
309    }
310
311    @Override
312    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
313        if (!availableDeps.containsKey(actionID)) {
314            return false;
315        }
316        else {
317            Collection<String> availList = availableDeps.get(actionID);
318            if (!availList.removeAll(dependencyURIs)) {
319                return false;
320            }
321            synchronized (availList) {
322                if (availList.isEmpty()) {
323                    availableDeps.remove(actionID);
324                }
325            }
326        }
327        return true;
328    }
329
330    @Override
331    public void destroy() {
332        availableDeps.clear();
333        cacheManager.shutdown();
334    }
335
336    @Override
337    public Object clone() throws CloneNotSupportedException {
338        throw new CloneNotSupportedException();
339    }
340
341    @Override
342    public void dispose() {
343    }
344
345    @Override
346    public void notifyElementExpired(Ehcache cache, Element element) {
347        // Invoked when timeToIdleSeconds or timeToLiveSeconds is met
348        String missingDepKey = (String) element.getObjectKey();
349        LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName());
350        onExpiryOrEviction(cache, element, missingDepKey);
351    }
352
353    @Override
354    public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException {
355
356    }
357
358    @Override
359    public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException {
360    }
361
362    @Override
363    public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException {
364    }
365
366    @Override
367    public void notifyRemoveAll(Ehcache arg0) {
368    }
369
370    @Override
371    public void notifyElementEvicted(Ehcache cache, Element element) {
372        // Invoked when maxElementsInMemory is met
373        String missingDepKey = (String) element.getObjectKey();
374        LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName());
375        onExpiryOrEviction(cache, element, missingDepKey);
376    }
377
378    private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) {
379        int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER);
380        int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1);
381        // server#db#table. Name of the cache is that of the server.
382        String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex);
383        String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex);
384        decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey);
385    }
386
387    /**
388     * Decrement partition key pattern count, once a hcat URI is removed from the cache
389     *
390     * @param tableKey key identifying the table - server#db#table
391     * @param partKeys partition key pattern
392     * @param hcatURI URI with the partition key pattern
393     */
394    private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) {
395        synchronized (partKeyPatterns) {
396            Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
397            if (patternCounts == null) {
398                LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
399                        + "But no corresponding pattern key table entry", hcatURI);
400            }
401            else {
402                SettableInteger count = patternCounts.get(partKeys);
403                if (count == null) {
404                    LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
405                            + "But no corresponding pattern key entry", hcatURI);
406                }
407                else {
408                    count.decrement();
409                    if (count.getValue() == 0) {
410                        patternCounts.remove(partKeys);
411                    }
412                    if (patternCounts.isEmpty()) {
413                        partKeyPatterns.remove(tableKey);
414                        String[] tableDetails = tableKey.split(TABLE_DELIMITER);
415                        unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]);
416                    }
417                }
418            }
419        }
420    }
421
422    private void unregisterFromNotifications(String server, String db, String table) {
423        // Close JMS session. Stop listening on topic
424        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
425        hcatService.unregisterFromNotification(server, db, table);
426    }
427
428    private static class SortedPKV {
429        private StringBuilder partKeys;
430        private StringBuilder partVals;
431
432        public SortedPKV(Map<String, String> partitions) {
433            this.partKeys = new StringBuilder();
434            this.partVals = new StringBuilder();
435            ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
436            Collections.sort(keys);
437            for (String key : keys) {
438                this.partKeys.append(key).append(PARTITION_DELIMITER);
439                this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER);
440            }
441            this.partKeys.setLength(partKeys.length() - 1);
442            this.partVals.setLength(partVals.length() - 1);
443        }
444
445        public String getPartKeys() {
446            return partKeys.toString();
447        }
448
449        public String getPartVals() {
450            return partVals.toString();
451        }
452
453    }
454
455    private static class SettableInteger {
456        private int value;
457
458        public SettableInteger(int value) {
459            this.value = value;
460        }
461
462        public int getValue() {
463            return value;
464        }
465
466        public void increment() {
467            value++;
468        }
469
470        public void decrement() {
471            value--;
472        }
473    }
474
475}