001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.lib.service.hadoop;
020
021import org.apache.hadoop.classification.InterfaceAudience;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.fs.permission.FsPermission;
027import org.apache.hadoop.lib.server.BaseService;
028import org.apache.hadoop.lib.server.ServiceException;
029import org.apache.hadoop.lib.service.FileSystemAccess;
030import org.apache.hadoop.lib.service.FileSystemAccessException;
031import org.apache.hadoop.lib.service.Instrumentation;
032import org.apache.hadoop.lib.service.Scheduler;
033import org.apache.hadoop.lib.util.Check;
034import org.apache.hadoop.lib.util.ConfigurationUtils;
035import org.apache.hadoop.security.UserGroupInformation;
036import org.apache.hadoop.util.VersionInfo;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import java.io.File;
041import java.io.IOException;
042import java.net.URI;
043import java.security.PrivilegedExceptionAction;
044import java.util.Collection;
045import java.util.HashSet;
046import java.util.Map;
047import java.util.Set;
048import java.util.concurrent.ConcurrentHashMap;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicInteger;
051
052@InterfaceAudience.Private
053public class FileSystemAccessService extends BaseService implements FileSystemAccess {
054  private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class);
055
056  public static final String PREFIX = "hadoop";
057
058  private static final String INSTRUMENTATION_GROUP = "hadoop";
059
060  public static final String AUTHENTICATION_TYPE = "authentication.type";
061  public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
062  public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
063  public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency";
064  public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout";
065
066  public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
067
068  public static final String HADOOP_CONF_DIR = "config.dir";
069
070  private static final String[] HADOOP_CONF_FILES = {"core-site.xml", "hdfs-site.xml"};
071
072  private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created";
073
074  private static class CachedFileSystem {
075    private FileSystem fs;
076    private long lastUse;
077    private long timeout;
078    private int count;
079
080    public CachedFileSystem(long timeout) {
081      this.timeout = timeout;
082      lastUse = -1;
083      count = 0;
084    }
085
086    synchronized FileSystem getFileSytem(Configuration conf)
087      throws IOException {
088      if (fs == null) {
089        fs = FileSystem.get(conf);
090      }
091      lastUse = -1;
092      count++;
093      return fs;
094    }
095
096    synchronized void release() throws IOException {
097      count--;
098      if (count == 0) {
099        if (timeout == 0) {
100          fs.close();
101          fs = null;
102          lastUse = -1;
103        }
104        else {
105          lastUse = System.currentTimeMillis();
106        }
107      }
108    }
109
110    // to avoid race conditions in the map cache adding removing entries
111    // an entry in the cache remains forever, it just closes/opens filesystems
112    // based on their utilization. Worse case scenario, the penalty we'll
113    // pay is that the amount of entries in the cache will be the total
114    // number of users in HDFS (which seems a resonable overhead).
115    synchronized boolean purgeIfIdle() throws IOException {
116      boolean ret = false;
117      if (count == 0 && lastUse != -1 &&
118          (System.currentTimeMillis() - lastUse) > timeout) {
119        fs.close();
120        fs = null;
121        lastUse = -1;
122        ret = true;
123      }
124      return ret;
125    }
126
127  }
128
129  public FileSystemAccessService() {
130    super(PREFIX);
131  }
132
133  private Collection<String> nameNodeWhitelist;
134
135  Configuration serviceHadoopConf;
136
137  private AtomicInteger unmanagedFileSystems = new AtomicInteger();
138
139  private ConcurrentHashMap<String, CachedFileSystem> fsCache =
140    new ConcurrentHashMap<String, CachedFileSystem>();
141
142  private long purgeTimeout;
143
144  @Override
145  protected void init() throws ServiceException {
146    LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion());
147    String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim();
148    if (security.equals("kerberos")) {
149      String defaultName = getServer().getName();
150      String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab";
151      keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim();
152      if (keytab.length() == 0) {
153        throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_KEYTAB);
154      }
155      String principal = defaultName + "/localhost@LOCALHOST";
156      principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim();
157      if (principal.length() == 0) {
158        throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_PRINCIPAL);
159      }
160      Configuration conf = new Configuration();
161      conf.set("hadoop.security.authentication", "kerberos");
162      UserGroupInformation.setConfiguration(conf);
163      try {
164        UserGroupInformation.loginUserFromKeytab(principal, keytab);
165      } catch (IOException ex) {
166        throw new ServiceException(FileSystemAccessException.ERROR.H02, ex.getMessage(), ex);
167      }
168      LOG.info("Using FileSystemAccess Kerberos authentication, principal [{}] keytab [{}]", principal, keytab);
169    } else if (security.equals("simple")) {
170      Configuration conf = new Configuration();
171      conf.set("hadoop.security.authentication", "simple");
172      UserGroupInformation.setConfiguration(conf);
173      LOG.info("Using FileSystemAccess simple/pseudo authentication, principal [{}]", System.getProperty("user.name"));
174    } else {
175      throw new ServiceException(FileSystemAccessException.ERROR.H09, security);
176    }
177
178    String hadoopConfDirProp = getServiceConfig().get(HADOOP_CONF_DIR, getServer().getConfigDir());
179    File hadoopConfDir = new File(hadoopConfDirProp).getAbsoluteFile();
180    if (hadoopConfDir == null) {
181      hadoopConfDir = new File(getServer().getConfigDir()).getAbsoluteFile();
182    }
183    if (!hadoopConfDir.exists()) {
184      throw new ServiceException(FileSystemAccessException.ERROR.H10, hadoopConfDir);
185    }
186    try {
187      serviceHadoopConf = loadHadoopConf(hadoopConfDir);
188    } catch (IOException ex) {
189      throw new ServiceException(FileSystemAccessException.ERROR.H11, ex.toString(), ex);
190    }
191
192    LOG.debug("FileSystemAccess FileSystem configuration:");
193    for (Map.Entry entry : serviceHadoopConf) {
194      LOG.debug("  {} = {}", entry.getKey(), entry.getValue());
195    }
196    setRequiredServiceHadoopConf(serviceHadoopConf);
197
198    nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST));
199  }
200
201  private Configuration loadHadoopConf(File dir) throws IOException {
202    Configuration hadoopConf = new Configuration(false);
203    for (String file : HADOOP_CONF_FILES) {
204      File f = new File(dir, file);
205      if (f.exists()) {
206        hadoopConf.addResource(new Path(f.getAbsolutePath()));
207      }
208    }
209    return hadoopConf;
210  }
211
212  @Override
213  public void postInit() throws ServiceException {
214    super.postInit();
215    Instrumentation instrumentation = getServer().get(Instrumentation.class);
216    instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() {
217      @Override
218      public Integer getValue() {
219        return unmanagedFileSystems.get();
220      }
221    });
222    instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() {
223      @Override
224      public Long getValue() {
225        return (long) unmanagedFileSystems.get();
226      }
227    });
228    Scheduler scheduler = getServer().get(Scheduler.class);
229    int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60);
230    purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60);
231    purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0;
232    if (purgeTimeout > 0) {
233      scheduler.schedule(new FileSystemCachePurger(),
234                         purgeInterval, purgeInterval, TimeUnit.SECONDS);
235    }
236  }
237
238  private class FileSystemCachePurger implements Runnable {
239
240    @Override
241    public void run() {
242      int count = 0;
243      for (CachedFileSystem cacheFs : fsCache.values()) {
244        try {
245          count += cacheFs.purgeIfIdle() ? 1 : 0;
246        } catch (Throwable ex) {
247          LOG.warn("Error while purging filesystem, " + ex.toString(), ex);
248        }
249      }
250      LOG.debug("Purged [{}} filesystem instances", count);
251    }
252  }
253
254  private Set<String> toLowerCase(Collection<String> collection) {
255    Set<String> set = new HashSet<String>();
256    for (String value : collection) {
257      set.add(value.toLowerCase());
258    }
259    return set;
260  }
261
262  @Override
263  public Class getInterface() {
264    return FileSystemAccess.class;
265  }
266
267  @Override
268  public Class[] getServiceDependencies() {
269    return new Class[]{Instrumentation.class, Scheduler.class};
270  }
271
272  protected UserGroupInformation getUGI(String user) throws IOException {
273    return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
274  }
275
276  protected void setRequiredServiceHadoopConf(Configuration conf) {
277    conf.set("fs.hdfs.impl.disable.cache", "true");
278  }
279
280  private static final String HTTPFS_FS_USER = "httpfs.fs.user";
281
282  protected FileSystem createFileSystem(Configuration namenodeConf)
283    throws IOException {
284    String user = UserGroupInformation.getCurrentUser().getShortUserName();
285    CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout);
286    CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS);
287    if (cachedFS == null) {
288      cachedFS = newCachedFS;
289    }
290    Configuration conf = new Configuration(namenodeConf);
291    conf.set(HTTPFS_FS_USER, user);
292    return cachedFS.getFileSytem(conf);
293  }
294
295  protected void closeFileSystem(FileSystem fs) throws IOException {
296    if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) {
297      fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release();
298    }
299  }
300
301  protected void validateNamenode(String namenode) throws FileSystemAccessException {
302    if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) {
303      if (!nameNodeWhitelist.contains(namenode.toLowerCase())) {
304        throw new FileSystemAccessException(FileSystemAccessException.ERROR.H05, namenode, "not in whitelist");
305      }
306    }
307  }
308
309  protected void checkNameNodeHealth(FileSystem fileSystem) throws FileSystemAccessException {
310  }
311
312  @Override
313  public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor)
314    throws FileSystemAccessException {
315    Check.notEmpty(user, "user");
316    Check.notNull(conf, "conf");
317    Check.notNull(executor, "executor");
318    if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
319      throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
320    }
321    if (conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) == null ||
322        conf.getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY).length() == 0) {
323      throw new FileSystemAccessException(FileSystemAccessException.ERROR.H06,
324                                          CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
325    }
326    try {
327      validateNamenode(
328        new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).
329          getAuthority());
330      UserGroupInformation ugi = getUGI(user);
331      return ugi.doAs(new PrivilegedExceptionAction<T>() {
332        @Override
333        public T run() throws Exception {
334          FileSystem fs = createFileSystem(conf);
335          Instrumentation instrumentation = getServer().get(Instrumentation.class);
336          Instrumentation.Cron cron = instrumentation.createCron();
337          try {
338            checkNameNodeHealth(fs);
339            cron.start();
340            return executor.execute(fs);
341          } finally {
342            cron.stop();
343            instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
344            closeFileSystem(fs);
345          }
346        }
347      });
348    } catch (FileSystemAccessException ex) {
349      throw ex;
350    } catch (Exception ex) {
351      throw new FileSystemAccessException(FileSystemAccessException.ERROR.H03, ex);
352    }
353  }
354
355  public FileSystem createFileSystemInternal(String user, final Configuration conf)
356    throws IOException, FileSystemAccessException {
357    Check.notEmpty(user, "user");
358    Check.notNull(conf, "conf");
359    if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
360      throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
361    }
362    try {
363      validateNamenode(
364        new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).getAuthority());
365      UserGroupInformation ugi = getUGI(user);
366      return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
367        @Override
368        public FileSystem run() throws Exception {
369          return createFileSystem(conf);
370        }
371      });
372    } catch (IOException ex) {
373      throw ex;
374    } catch (FileSystemAccessException ex) {
375      throw ex;
376    } catch (Exception ex) {
377      throw new FileSystemAccessException(FileSystemAccessException.ERROR.H08, ex.getMessage(), ex);
378    }
379  }
380
381  @Override
382  public FileSystem createFileSystem(String user, final Configuration conf) throws IOException,
383    FileSystemAccessException {
384    unmanagedFileSystems.incrementAndGet();
385    return createFileSystemInternal(user, conf);
386  }
387
388  @Override
389  public void releaseFileSystem(FileSystem fs) throws IOException {
390    unmanagedFileSystems.decrementAndGet();
391    closeFileSystem(fs);
392  }
393
394  @Override
395  public Configuration getFileSystemConfiguration() {
396    Configuration conf = new Configuration(true);
397    ConfigurationUtils.copy(serviceHadoopConf, conf);
398    conf.setBoolean(FILE_SYSTEM_SERVICE_CREATED, true);
399
400    // Force-clear server-side umask to make HttpFS match WebHDFS behavior
401    conf.set(FsPermission.UMASK_LABEL, "000");
402
403    return conf;
404  }
405
406}