001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.BufferedReader;
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStreamReader;
026import java.net.URI;
027import java.util.HashSet;
028import java.util.Set;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.oozie.BundleJobBean;
034import org.apache.oozie.CoordinatorJobBean;
035import org.apache.oozie.ErrorCode;
036import org.apache.oozie.WorkflowJobBean;
037import org.apache.oozie.client.XOozieClient;
038import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
039import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
040import org.apache.oozie.executor.jpa.JPAExecutorException;
041import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
042import org.apache.oozie.util.ConfigUtils;
043import org.apache.oozie.util.Instrumentation;
044import org.apache.oozie.util.XLog;
045
046/**
047 * The authorization service provides all authorization checks.
048 */
049public class AuthorizationService implements Service {
050
051    public static final String CONF_PREFIX = Service.CONF_PREFIX + "AuthorizationService.";
052
053    /**
054     * Configuration parameter to enable or disable Oozie admin role.
055     */
056    public static final String CONF_SECURITY_ENABLED = CONF_PREFIX + "security.enabled";
057
058    /**
059     * Configuration parameter to enable or disable Oozie admin role.
060     */
061    public static final String CONF_AUTHORIZATION_ENABLED = CONF_PREFIX + "authorization.enabled";
062
063    /**
064     * Configuration parameter to enable old behavior default group as ACL.
065     */
066    public static final String CONF_DEFAULT_GROUP_AS_ACL = CONF_PREFIX + "default.group.as.acl";
067
068    /**
069     * Configuration parameter to define admin groups, if NULL/empty the adminusers.txt file is used.
070     */
071    public static final String CONF_ADMIN_GROUPS = CONF_PREFIX + "admin.groups";
072
073    /**
074     * File that contains list of admin users for Oozie.
075     */
076    public static final String ADMIN_USERS_FILE = "adminusers.txt";
077
078    protected static final String INSTRUMENTATION_GROUP = "authorization";
079    protected static final String INSTR_FAILED_AUTH_COUNTER = "authorization.failed";
080
081    private Set<String> adminGroups;
082    private Set<String> adminUsers;
083    private boolean authorizationEnabled;
084    private boolean useDefaultGroupAsAcl;
085
086    private final XLog log = XLog.getLog(getClass());
087    private Instrumentation instrumentation;
088
089    private String[] getTrimmedStrings(String str) {
090        if (null == str || "".equals(str.trim())) {
091            return new String[0];
092        }
093        return str.trim().split("\\s*,\\s*");
094    }
095
096    /**
097     * Initialize the service. <p/> Reads the security related configuration. parameters - security enabled and list of
098     * super users.
099     *
100     * @param services services instance.
101     * @throws ServiceException thrown if the service could not be initialized.
102     */
103    public void init(Services services) throws ServiceException {
104        authorizationEnabled =
105            ConfigUtils.getWithDeprecatedCheck(services.getConf(), CONF_AUTHORIZATION_ENABLED,
106                                               CONF_SECURITY_ENABLED, false);
107        if (authorizationEnabled) {
108            log.info("Oozie running with authorization enabled");
109            useDefaultGroupAsAcl = Services.get().getConf().getBoolean(CONF_DEFAULT_GROUP_AS_ACL, false);
110            String[] str = getTrimmedStrings(Services.get().getConf().get(CONF_ADMIN_GROUPS));
111            if (str.length > 0) {
112                log.info("Admin users will be checked against the defined admin groups");
113                adminGroups = new HashSet<String>();
114                for (String s : str) {
115                    adminGroups.add(s.trim());
116                }
117            }
118            else {
119                log.info("Admin users will be checked against the 'adminusers.txt' file contents");
120                adminUsers = new HashSet<String>();
121                loadAdminUsers();
122            }
123        }
124        else {
125            log.warn("Oozie running with authorization disabled");
126        }
127        instrumentation = Services.get().get(InstrumentationService.class).get();
128    }
129
130    /**
131     * Return if security is enabled or not.
132     *
133     * @return if security is enabled or not.
134     */
135    @Deprecated
136    public boolean isSecurityEnabled() {
137        return authorizationEnabled;
138    }
139
140    public boolean useDefaultGroupAsAcl() {
141        return useDefaultGroupAsAcl;
142    }
143
144    /**
145     * Return if security is enabled or not.
146     *
147     * @return if security is enabled or not.
148     */
149    public boolean isAuthorizationEnabled() {
150        return isSecurityEnabled();
151    }
152
153    /**
154     * Load the list of admin users from {@link AuthorizationService#ADMIN_USERS_FILE} </p>
155     *
156     * @throws ServiceException if the admin user list could not be loaded.
157     */
158    private void loadAdminUsers() throws ServiceException {
159        String configDir = Services.get().get(ConfigurationService.class).getConfigDir();
160        if (configDir != null) {
161            File file = new File(configDir, ADMIN_USERS_FILE);
162            if (file.exists()) {
163                try {
164                    BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
165                    try {
166                        String line = br.readLine();
167                        while (line != null) {
168                            line = line.trim();
169                            if (line.length() > 0 && !line.startsWith("#")) {
170                                adminUsers.add(line);
171                            }
172                            line = br.readLine();
173                        }
174                    }
175                    catch (IOException ex) {
176                        throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
177                    }
178                }
179                catch (FileNotFoundException ex) {
180                    throw new ServiceException(ErrorCode.E0160, file.getAbsolutePath(), ex);
181                }
182            }
183            else {
184                log.warn("Admin users file not available in config dir [{0}], running without admin users", configDir);
185            }
186        }
187        else {
188            log.warn("Reading configuration from classpath, running without admin users");
189        }
190    }
191
192    /**
193     * Destroy the service. <p/> This implementation does a NOP.
194     */
195    public void destroy() {
196    }
197
198    /**
199     * Return the public interface of the service.
200     *
201     * @return {@link AuthorizationService}.
202     */
203    public Class<? extends Service> getInterface() {
204        return AuthorizationService.class;
205    }
206
207    /**
208     * Check if the user belongs to the group or not.
209     *
210     * @param user user name.
211     * @param group group name.
212     * @return if the user belongs to the group or not.
213     * @throws AuthorizationException thrown if the authorization query can not be performed.
214     */
215    protected boolean isUserInGroup(String user, String group) throws AuthorizationException {
216        GroupsService groupsService = Services.get().get(GroupsService.class);
217        try {
218            return groupsService.getGroups(user).contains(group);
219        }
220        catch (IOException ex) {
221            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
222        }
223    }
224
225    /**
226     * Check if the user belongs to the group or not. <p/> <p/> Subclasses should override the {@link #isUserInGroup}
227     * method.
228     *
229     * @param user user name.
230     * @param group group name.
231     * @throws AuthorizationException thrown if the user is not authorized for the group or if the authorization query
232     * can not be performed.
233     */
234    public void authorizeForGroup(String user, String group) throws AuthorizationException {
235        if (authorizationEnabled && !isUserInGroup(user, group)) {
236            throw new AuthorizationException(ErrorCode.E0502, user, group);
237        }
238    }
239
240    /**
241     * Return the default group to which the user belongs. <p/> This implementation always returns 'users'.
242     *
243     * @param user user name.
244     * @return default group of user.
245     * @throws AuthorizationException thrown if the default group con not be retrieved.
246     */
247    public String getDefaultGroup(String user) throws AuthorizationException {
248        try {
249            return Services.get().get(GroupsService.class).getGroups(user).get(0);
250        }
251        catch (IOException ex) {
252            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
253        }
254    }
255
256    /**
257     * Check if the user has admin privileges. <p/> If admin is disabled it returns always <code>true</code>. <p/> If
258     * admin is enabled it returns <code>true</code> if the user is in the <code>adminusers.txt</code> file.
259     *
260     * @param user user name.
261     * @return if the user has admin privileges or not.
262     */
263    protected boolean isAdmin(String user) {
264        boolean admin = false;
265        if (adminUsers != null) {
266            admin = adminUsers.contains(user);
267        }
268        else {
269            for (String adminGroup : adminGroups) {
270                try {
271                    admin = isUserInGroup(user, adminGroup);
272                    if (admin) {
273                        break;
274                    }
275                }
276                catch (AuthorizationException ex) {
277                    log.warn("Admin check failed, " + ex.toString(), ex);
278                    break;
279                }
280            }
281        }
282        return admin;
283    }
284
285    /**
286     * Check if the user has admin privileges. <p/> Subclasses should override the {@link #isUserInGroup} method.
287     *
288     * @param user user name.
289     * @param write indicates if the check is for read or write admin tasks (in this implementation this is ignored)
290     * @throws AuthorizationException thrown if user does not have admin priviledges.
291     */
292    public void authorizeForAdmin(String user, boolean write) throws AuthorizationException {
293        if (authorizationEnabled && write && !isAdmin(user)) {
294            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
295            throw new AuthorizationException(ErrorCode.E0503, user);
296        }
297    }
298
299    /**
300     * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
301     * file system permissions on the workflow application.
302     *
303     * @param user user name.
304     * @param group group name.
305     * @param appPath application path.
306     * @throws AuthorizationException thrown if the user is not authorized for the app.
307     */
308    public void authorizeForApp(String user, String group, String appPath, Configuration jobConf)
309            throws AuthorizationException {
310        try {
311            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
312            URI uri = new Path(appPath).toUri();
313            Configuration fsConf = has.createJobConf(uri.getAuthority());
314            FileSystem fs = has.createFileSystem(user, uri, fsConf);
315
316            Path path = new Path(appPath);
317            try {
318                if (!fs.exists(path)) {
319                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
320                    throw new AuthorizationException(ErrorCode.E0504, appPath);
321                }
322                Path wfXml = new Path(path, "workflow.xml");
323                if (!fs.exists(wfXml)) {
324                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
325                    throw new AuthorizationException(ErrorCode.E0505, appPath);
326                }
327                if (!fs.isFile(wfXml)) {
328                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
329                    throw new AuthorizationException(ErrorCode.E0506, appPath);
330                }
331                fs.open(wfXml).close();
332            }
333            // TODO change this when stopping support of 0.18 to the new
334            // Exception
335            catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
336                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
337                throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
338            }
339        }
340        catch (IOException ex) {
341            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
342            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
343        }
344        catch (HadoopAccessorException e) {
345            throw new AuthorizationException(e);
346        }
347    }
348
349    /**
350     * Check if the user+group is authorized to use the specified application. <p/> The check is done by checking the
351     * file system permissions on the workflow application.
352     *
353     * @param user user name.
354     * @param group group name.
355     * @param appPath application path.
356     * @param fileName workflow or coordinator.xml
357     * @param conf
358     * @throws AuthorizationException thrown if the user is not authorized for the app.
359     */
360    public void authorizeForApp(String user, String group, String appPath, String fileName, Configuration conf)
361            throws AuthorizationException {
362        try {
363            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
364            URI uri = new Path(appPath).toUri();
365            Configuration fsConf = has.createJobConf(uri.getAuthority());
366            FileSystem fs = has.createFileSystem(user, uri, fsConf);
367
368            Path path = new Path(appPath);
369            try {
370                if (!fs.exists(path)) {
371                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
372                    throw new AuthorizationException(ErrorCode.E0504, appPath);
373                }
374                if (conf.get(XOozieClient.IS_PROXY_SUBMISSION) == null) { // Only further check existence of job definition files for non proxy submission jobs;
375                    if (!fs.isFile(path)) {
376                        Path appXml = new Path(path, fileName);
377                        if (!fs.exists(appXml)) {
378                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
379                            throw new AuthorizationException(ErrorCode.E0505, appPath);
380                        }
381                        if (!fs.isFile(appXml)) {
382                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
383                            throw new AuthorizationException(ErrorCode.E0506, appPath);
384                        }
385                        fs.open(appXml).close();
386                    }
387                }
388            }
389            // TODO change this when stopping support of 0.18 to the new
390            // Exception
391            catch (org.apache.hadoop.fs.permission.AccessControlException ex) {
392                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
393                throw new AuthorizationException(ErrorCode.E0507, appPath, ex.getMessage(), ex);
394            }
395        }
396        catch (IOException ex) {
397            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
398            throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
399        }
400        catch (HadoopAccessorException e) {
401            throw new AuthorizationException(e);
402        }
403    }
404
405    private boolean isUserInAcl(String user, String aclStr) throws IOException {
406        boolean userInAcl = false;
407        if (aclStr != null && aclStr.trim().length() > 0) {
408            GroupsService groupsService = Services.get().get(GroupsService.class);
409            String[] acl = aclStr.split(",");
410            for (int i = 0; !userInAcl && i < acl.length; i++) {
411                String aclItem = acl[i].trim();
412                userInAcl = aclItem.equals(user) || groupsService.getGroups(user).equals(aclItem);
413            }
414        }
415        return userInAcl;
416    }
417
418    /**
419     * Check if the user+group is authorized to operate on the specified job. <p/> Checks if the user is a super-user or
420     * the one who started the job. <p/> Read operations are allowed to all users.
421     *
422     * @param user user name.
423     * @param jobId job id.
424     * @param write indicates if the check is for read or write job tasks.
425     * @throws AuthorizationException thrown if the user is not authorized for the job.
426     */
427    public void authorizeForJob(String user, String jobId, boolean write) throws AuthorizationException {
428        if (authorizationEnabled && write && !isAdmin(user)) {
429            try {
430                // handle workflow jobs
431                if (jobId.endsWith("-W")) {
432                    WorkflowJobBean jobBean = null;
433                    JPAService jpaService = Services.get().get(JPAService.class);
434                    if (jpaService != null) {
435                        try {
436                            jobBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
437                        }
438                        catch (JPAExecutorException je) {
439                            throw new AuthorizationException(je);
440                        }
441                    }
442                    else {
443                        throw new AuthorizationException(ErrorCode.E0610);
444                    }
445                    if (jobBean != null && !jobBean.getUser().equals(user)) {
446                        if (!isUserInAcl(user, jobBean.getGroup())) {
447                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
448                            throw new AuthorizationException(ErrorCode.E0508, user, jobId);
449                        }
450                    }
451                }
452                // handle bundle jobs
453                else if (jobId.endsWith("-B")){
454                    BundleJobBean jobBean = null;
455                    JPAService jpaService = Services.get().get(JPAService.class);
456                    if (jpaService != null) {
457                        try {
458                            jobBean = jpaService.execute(new BundleJobGetJPAExecutor(jobId));
459                        }
460                        catch (JPAExecutorException je) {
461                            throw new AuthorizationException(je);
462                        }
463                    }
464                    else {
465                        throw new AuthorizationException(ErrorCode.E0610);
466                    }
467                    if (jobBean != null && !jobBean.getUser().equals(user)) {
468                        if (!isUserInAcl(user, jobBean.getGroup())) {
469                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
470                            throw new AuthorizationException(ErrorCode.E0509, user, jobId);
471                        }
472                    }
473                }
474                // handle coordinator jobs
475                else {
476                    CoordinatorJobBean jobBean = null;
477                    JPAService jpaService = Services.get().get(JPAService.class);
478                    if (jpaService != null) {
479                        try {
480                            jobBean = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
481                        }
482                        catch (JPAExecutorException je) {
483                            throw new AuthorizationException(je);
484                        }
485                    }
486                    else {
487                        throw new AuthorizationException(ErrorCode.E0610);
488                    }
489                    if (jobBean != null && !jobBean.getUser().equals(user)) {
490                        if (!isUserInAcl(user, jobBean.getGroup())) {
491                            incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
492                            throw new AuthorizationException(ErrorCode.E0509, user, jobId);
493                        }
494                    }
495                }
496            }
497            catch (IOException ex) {
498                throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex);
499            }
500        }
501    }
502
503    /**
504     * Convenience method for instrumentation counters.
505     *
506     * @param name counter name.
507     * @param count count to increment the counter.
508     */
509    private void incrCounter(String name, int count) {
510        if (instrumentation != null) {
511            instrumentation.incr(INSTRUMENTATION_GROUP, name, count);
512        }
513    }
514}