001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.dependency;
019
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.util.Arrays;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hive.conf.HiveConf;
032import org.apache.hadoop.security.UserGroupInformation;
033import org.apache.hcatalog.api.ConnectionFailureException;
034import org.apache.hcatalog.api.HCatClient;
035import org.apache.hcatalog.api.HCatPartition;
036import org.apache.hcatalog.common.HCatException;
037import org.apache.oozie.ErrorCode;
038import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
039import org.apache.oozie.action.hadoop.LauncherURIHandler;
040import org.apache.oozie.dependency.hcat.HCatMessageHandler;
041import org.apache.oozie.service.HCatAccessorException;
042import org.apache.oozie.service.HCatAccessorService;
043import org.apache.oozie.service.PartitionDependencyManagerService;
044import org.apache.oozie.service.Services;
045import org.apache.oozie.service.URIHandlerService;
046import org.apache.oozie.util.HCatURI;
047import org.apache.oozie.util.XLog;
048
049public class HCatURIHandler implements URIHandler {
050
051    private Set<String> supportedSchemes;
052    private Map<String, DependencyType> dependencyTypes;
053    private List<Class<?>> classesToShip;
054
055    @Override
056    public void init(Configuration conf) {
057        dependencyTypes = new HashMap<String, DependencyType>();
058        supportedSchemes = new HashSet<String>();
059        String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
060                + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
061        supportedSchemes.addAll(Arrays.asList(schemes));
062        classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
063    }
064
065    @Override
066    public Set<String> getSupportedSchemes() {
067        return supportedSchemes;
068    }
069
070    @Override
071    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
072        return HCatLauncherURIHandler.class;
073    }
074
075    @Override
076    public List<Class<?>> getClassesForLauncher() {
077        return classesToShip;
078    }
079
080    @Override
081    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
082        DependencyType depType = DependencyType.PULL;
083        // Not initializing in constructor as this will be part of oozie.services.ext
084        // and will be initialized after URIHandlerService
085        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
086        if (hcatService != null) {
087            depType = dependencyTypes.get(uri.getAuthority());
088            if (depType == null) {
089                depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
090                dependencyTypes.put(uri.getAuthority(), depType);
091            }
092        }
093        return depType;
094    }
095
096    @Override
097    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
098            throws URIHandlerException {
099        HCatURI hcatURI;
100        try {
101            hcatURI = new HCatURI(uri);
102        }
103        catch (URISyntaxException e) {
104            throw new URIHandlerException(ErrorCode.E0906, uri, e);
105        }
106        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
107        if (!hcatService.isRegisteredForNotification(hcatURI)) {
108            HCatClient client = getHCatClient(uri, conf, user);
109            try {
110                String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
111                if (topic == null) {
112                    return;
113                }
114                hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
115            }
116            catch (HCatException e) {
117                throw new HCatAccessorException(ErrorCode.E1501, e);
118            }
119            finally {
120                closeQuietly(client, true);
121            }
122        }
123        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
124        pdmService.addMissingDependency(hcatURI, actionID);
125    }
126
127    @Override
128    public boolean unregisterFromNotification(URI uri, String actionID) {
129        HCatURI hcatURI;
130        try {
131            hcatURI = new HCatURI(uri);
132        }
133        catch (URISyntaxException e) {
134            throw new RuntimeException(e); // Unexpected at this point
135        }
136        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
137        return pdmService.removeMissingDependency(hcatURI, actionID);
138    }
139
140    @Override
141    public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
142        HCatClient client = getHCatClient(uri, conf, user);
143        return new HCatContext(conf, user, client);
144    }
145
146    @Override
147    public boolean exists(URI uri, Context context) throws URIHandlerException {
148        HCatClient client = ((HCatContext) context).getHCatClient();
149        return exists(uri, client, false);
150    }
151
152    @Override
153    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
154        HCatClient client = getHCatClient(uri, conf, user);
155        return exists(uri, client, true);
156    }
157
158    @Override
159    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
160        return uri;
161    }
162
163    @Override
164    public void validate(String uri) throws URIHandlerException {
165        try {
166            new HCatURI(uri); // will fail if uri syntax is incorrect
167        }
168        catch (URISyntaxException e) {
169            throw new URIHandlerException(ErrorCode.E0906, uri, e);
170        }
171
172    }
173
174    @Override
175    public void destroy() {
176
177    }
178
179    private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws HCatAccessorException {
180        final HiveConf hiveConf = new HiveConf(conf, this.getClass());
181        String serverURI = getMetastoreConnectURI(uri);
182        if (!serverURI.equals("")) {
183            hiveConf.set("hive.metastore.local", "false");
184        }
185        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
186        try {
187            XLog.getLog(HCatURIHandler.class).info(
188                    "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
189                    UserGroupInformation.getLoginUser(), serverURI);
190
191            // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs
192            // We are good to connect as the oozie user since listPartitions does not require
193            // authorization
194            /*
195            UserGroupInformation ugi = ugiService.getProxyUser(user);
196            return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
197                public HCatClient run() throws Exception {
198                    return HCatClient.create(hiveConf);
199                }
200            });
201            */
202
203            return HCatClient.create(hiveConf);
204        }
205        catch (HCatException e) {
206            throw new HCatAccessorException(ErrorCode.E1501, e);
207        }
208        catch (IOException e) {
209            throw new HCatAccessorException(ErrorCode.E1501, e);
210        }
211
212    }
213
214    private String getMetastoreConnectURI(URI uri) {
215        String metastoreURI;
216        // For unit tests
217        if (uri.getAuthority().equals("unittest-local")) {
218            metastoreURI = "";
219        }
220        else {
221            // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
222            // is added
223            metastoreURI = "thrift://" + uri.getAuthority();
224        }
225        return metastoreURI;
226    }
227
228    private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException {
229        try {
230            HCatURI hcatURI = new HCatURI(uri.toString());
231            List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
232                    hcatURI.getPartitionMap());
233            return (partitions != null && !partitions.isEmpty());
234        }
235        catch (ConnectionFailureException e) {
236            throw new HCatAccessorException(ErrorCode.E1501, e);
237        }
238        catch (HCatException e) {
239            throw new HCatAccessorException(ErrorCode.E0902, e);
240        }
241        catch (URISyntaxException e) {
242            throw new HCatAccessorException(ErrorCode.E0902, e);
243        }
244        finally {
245            closeQuietly(client, closeClient);
246        }
247    }
248
249    private void closeQuietly(HCatClient client, boolean close) {
250        if (close && client != null) {
251            try {
252                client.close();
253            }
254            catch (Exception ignore) {
255                XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
256            }
257        }
258    }
259
260    static class HCatContext extends Context {
261
262        private HCatClient hcatClient;
263
264        /**
265         * Create a HCatContext that can be used to access a hcat URI
266         *
267         * @param conf Configuration to access the URI
268         * @param user name of the user the URI should be accessed as
269         * @param hcatClient HCatClient to talk to hcatalog server
270         */
271        public HCatContext(Configuration conf, String user, HCatClient hcatClient) {
272            super(conf, user);
273            this.hcatClient = hcatClient;
274        }
275
276        /**
277         * Get the HCatClient to talk to hcatalog server
278         *
279         * @return HCatClient to talk to hcatalog server
280         */
281        public HCatClient getHCatClient() {
282            return hcatClient;
283        }
284
285        @Override
286        public void destroy() {
287            try {
288                hcatClient.close();
289            }
290            catch (Exception ignore) {
291                XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
292            }
293        }
294
295    }
296
297}