001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.dependency.hcat;
019
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.oozie.service.HCatAccessorService;
033import org.apache.oozie.service.Services;
034import org.apache.oozie.util.HCatURI;
035import org.apache.oozie.util.XLog;
036
037public class SimpleHCatDependencyCache implements HCatDependencyCache {
038
039    private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class);
040    private static String DELIMITER = ";";
041
042    /**
043     * Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition
044     * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as
045     * string).
046     */
047    private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps;
048
049    /**
050     * Map of actionIDs and collection of available URIs
051     */
052    private ConcurrentMap<String, Collection<String>> availableDeps;
053
054    // TODO:
055    // Gather and print stats on cache hits and misses.
056
057    @Override
058    public void init(Configuration conf) {
059        missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
060        availableDeps = new ConcurrentHashMap<String, Collection<String>>();
061    }
062
063    @Override
064    public void addMissingDependency(HCatURI hcatURI, String actionID) {
065        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
066        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
067        // Partition keys seperated by ;. For eg: date;country;state
068        String partKey = sortedPKV.getPartKeys();
069        // Partition values seperated by ;. For eg: 20120101;US;CA
070        String partVal = sortedPKV.getPartVals();
071        ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
072        if (partKeyPatterns == null) {
073            partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>();
074            ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent(
075                    tableKey, partKeyPatterns);
076            if (existingMap != null) {
077                partKeyPatterns = existingMap;
078            }
079        }
080        synchronized (partKeyPatterns) {
081            missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
082            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
083            if (partValues == null) {
084                partValues = new HashMap<String, Collection<WaitingAction>>();
085                partKeyPatterns.put(partKey, partValues);
086            }
087            Collection<WaitingAction> waitingActions = partValues.get(partVal);
088            if (waitingActions == null) {
089                waitingActions = new HashSet<WaitingAction>();
090                partValues.put(partVal, waitingActions);
091            }
092            waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
093        }
094    }
095
096    @Override
097    public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
098        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
099        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
100        String partKey = sortedPKV.getPartKeys();
101        String partVal = sortedPKV.getPartVals();
102        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
103        if (partKeyPatterns == null) {
104            LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
105                    hcatURI.toURIString(), actionID);
106            return false;
107        }
108        synchronized(partKeyPatterns) {
109            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
110            if (partValues == null) {
111                LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
112                        hcatURI.toURIString(), actionID);
113                return false;
114            }
115            Collection<WaitingAction> waitingActions = partValues.get(partVal);
116            if (waitingActions == null) {
117                LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
118                        hcatURI.toURIString(), actionID);
119                return false;
120            }
121            boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
122            if (!removed) {
123                LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
124                        hcatURI.toURIString(), actionID);
125            }
126            if (waitingActions.isEmpty()) {
127                partValues.remove(partVal);
128                if (partValues.isEmpty()) {
129                    partKeyPatterns.remove(partKey);
130                }
131                if (partKeyPatterns.isEmpty()) {
132                    missingDeps.remove(tableKey);
133                    // Close JMS session. Stop listening on topic
134                    HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
135                    hcatService.unregisterFromNotification(hcatURI);
136                }
137            }
138            return removed;
139        }
140    }
141
142    @Override
143    public Collection<String> getWaitingActions(HCatURI hcatURI) {
144        String tableKey = hcatURI.getServer() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable();
145        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
146        String partKey = sortedPKV.getPartKeys();
147        String partVal = sortedPKV.getPartVals();
148        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
149        if (partKeyPatterns == null) {
150            return null;
151        }
152        Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
153        if (partValues == null) {
154            return null;
155        }
156        Collection<WaitingAction> waitingActions = partValues.get(partVal);
157        if (waitingActions == null) {
158            return null;
159        }
160        Collection<String> actionIDs = new ArrayList<String>();
161        String uriString = hcatURI.toURIString();
162        for (WaitingAction action : waitingActions) {
163            if (action.getDependencyURI().equals(uriString)) {
164                actionIDs.add(action.getActionID());
165            }
166        }
167        return actionIDs;
168    }
169
170    @Override
171    public Collection<String> markDependencyAvailable(String server, String db, String table,
172            Map<String, String> partitions) {
173        String tableKey = server + DELIMITER + db + DELIMITER + table;
174        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
175        if (partKeyPatterns == null) {
176            LOG.warn("Got partition available notification for " + tableKey
177                    + ". Unexpected and should not be listening to topic. Unregistering topic");
178            HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
179            hcatService.unregisterFromNotification(server, db, table);
180            return null;
181        }
182        Collection<String> actionsWithAvailDep = new HashSet<String>();
183        List<String> partKeysToRemove = new ArrayList<String>();
184        StringBuilder partValSB = new StringBuilder();
185        synchronized (partKeyPatterns) {
186            // If partition patterns are date, date;country and date;country;state,
187            // construct the partition values for each pattern from the available partitions map and
188            // for the matching value in the dependency map, get the waiting actions.
189            for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
190                String[] partKeys = entry.getKey().split(DELIMITER);
191                partValSB.setLength(0);
192                for (String key : partKeys) {
193                    partValSB.append(partitions.get(key)).append(DELIMITER);
194                }
195                partValSB.setLength(partValSB.length() - 1);
196
197                Map<String, Collection<WaitingAction>> partValues = entry.getValue();
198                String partVal = partValSB.toString();
199                Collection<WaitingAction> wActions = entry.getValue().get(partVal);
200                if (wActions == null)
201                    continue;
202                for (WaitingAction wAction : wActions) {
203                    String actionID = wAction.getActionID();
204                    actionsWithAvailDep.add(actionID);
205                    Collection<String> depURIs = availableDeps.get(actionID);
206                    if (depURIs == null) {
207                        depURIs = new ArrayList<String>();
208                        Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
209                        if (existing != null) {
210                            depURIs = existing;
211                        }
212                    }
213                    synchronized (depURIs) {
214                        depURIs.add(wAction.getDependencyURI());
215                        availableDeps.put(actionID, depURIs);
216                    }
217                }
218                partValues.remove(partVal);
219                if (partValues.isEmpty()) {
220                    partKeysToRemove.add(entry.getKey());
221                }
222            }
223            for (String partKey : partKeysToRemove) {
224                partKeyPatterns.remove(partKey);
225            }
226            if (partKeyPatterns.isEmpty()) {
227                missingDeps.remove(tableKey);
228                // Close JMS session. Stop listening on topic
229                HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
230                hcatService.unregisterFromNotification(server, db, table);
231            }
232        }
233        return actionsWithAvailDep;
234    }
235
236    @Override
237    public Collection<String> getAvailableDependencyURIs(String actionID) {
238        Collection<String> available = availableDeps.get(actionID);
239        if (available !=  null) {
240            // Return a copy
241            available = new ArrayList<String>(available);
242        }
243        return available;
244    }
245
246    @Override
247    public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
248        if (!availableDeps.containsKey(actionID)) {
249            return false;
250        }
251        else {
252            Collection<String> availList = availableDeps.get(actionID);
253            if (!availList.removeAll(dependencyURIs)) {
254                return false;
255            }
256            synchronized (availList) {
257                if (availList.isEmpty()) {
258                    availableDeps.remove(actionID);
259                }
260            }
261        }
262        return true;
263    }
264
265    @Override
266    public void destroy() {
267        missingDeps.clear();
268        availableDeps.clear();
269    }
270
271    private static class SortedPKV {
272        private StringBuilder partKeys;
273        private StringBuilder partVals;
274
275        public SortedPKV(Map<String, String> partitions) {
276            this.partKeys = new StringBuilder();
277            this.partVals = new StringBuilder();
278            ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
279            Collections.sort(keys);
280            for (String key : keys) {
281                this.partKeys.append(key).append(DELIMITER);
282                this.partVals.append(partitions.get(key)).append(DELIMITER);
283            }
284            this.partKeys.setLength(partKeys.length() - 1);
285            this.partVals.setLength(partVals.length() - 1);
286        }
287
288        public String getPartKeys() {
289            return partKeys.toString();
290        }
291
292        public String getPartVals() {
293            return partVals.toString();
294        }
295
296    }
297
298}