001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.lib.service.hadoop; 020 021import org.apache.hadoop.classification.InterfaceAudience; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.fs.permission.FsPermission; 027import org.apache.hadoop.lib.server.BaseService; 028import org.apache.hadoop.lib.server.ServiceException; 029import org.apache.hadoop.lib.service.FileSystemAccess; 030import org.apache.hadoop.lib.service.FileSystemAccessException; 031import org.apache.hadoop.lib.service.Instrumentation; 032import org.apache.hadoop.lib.service.Scheduler; 033import org.apache.hadoop.lib.util.Check; 034import org.apache.hadoop.lib.util.ConfigurationUtils; 035import org.apache.hadoop.security.UserGroupInformation; 036import org.apache.hadoop.util.VersionInfo; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import java.io.File; 041import java.io.IOException; 042import java.net.URI; 043import java.security.PrivilegedExceptionAction; 044import java.util.Collection; 045import java.util.HashSet; 046import java.util.Map; 047import java.util.Set; 048import java.util.concurrent.ConcurrentHashMap; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.atomic.AtomicInteger; 051 052@InterfaceAudience.Private 053public class FileSystemAccessService extends BaseService implements FileSystemAccess { 054 private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class); 055 056 public static final String PREFIX = "hadoop"; 057 058 private static final String INSTRUMENTATION_GROUP = "hadoop"; 059 060 public static final String AUTHENTICATION_TYPE = "authentication.type"; 061 public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab"; 062 public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal"; 063 public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency"; 064 public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout"; 065 066 public static final String NAME_NODE_WHITELIST = "name.node.whitelist"; 067 068 public static final String HADOOP_CONF_DIR = "config.dir"; 069 070 private static final String[] HADOOP_CONF_FILES = {"core-site.xml", "hdfs-site.xml"}; 071 072 private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created"; 073 074 private static class CachedFileSystem { 075 private FileSystem fs; 076 private long lastUse; 077 private long timeout; 078 private int count; 079 080 public CachedFileSystem(long timeout) { 081 this.timeout = timeout; 082 lastUse = -1; 083 count = 0; 084 } 085 086 synchronized FileSystem getFileSytem(Configuration conf) 087 throws IOException { 088 if (fs == null) { 089 fs = FileSystem.get(conf); 090 } 091 lastUse = -1; 092 count++; 093 return fs; 094 } 095 096 synchronized void release() throws IOException { 097 count--; 098 if (count == 0) { 099 if (timeout == 0) { 100 fs.close(); 101 fs = null; 102 lastUse = -1; 103 } 104 else { 105 lastUse = System.currentTimeMillis(); 106 } 107 } 108 } 109 110 // to avoid race conditions in the map cache adding removing entries 111 // an entry in the cache remains forever, it just closes/opens filesystems 112 // based on their utilization. Worse case scenario, the penalty we'll 113 // pay is that the amount of entries in the cache will be the total 114 // number of users in HDFS (which seems a resonable overhead). 115 synchronized boolean purgeIfIdle() throws IOException { 116 boolean ret = false; 117 if (count == 0 && lastUse != -1 && 118 (System.currentTimeMillis() - lastUse) > timeout) { 119 fs.close(); 120 fs = null; 121 lastUse = -1; 122 ret = true; 123 } 124 return ret; 125 } 126 127 } 128 129 public FileSystemAccessService() { 130 super(PREFIX); 131 } 132 133 private Collection<String> nameNodeWhitelist; 134 135 Configuration serviceHadoopConf; 136 137 private AtomicInteger unmanagedFileSystems = new AtomicInteger(); 138 139 private ConcurrentHashMap<String, CachedFileSystem> fsCache = 140 new ConcurrentHashMap<String, CachedFileSystem>(); 141 142 private long purgeTimeout; 143 144 @Override 145 protected void init() throws ServiceException { 146 LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion()); 147 String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim(); 148 if (security.equals("kerberos")) { 149 String defaultName = getServer().getName(); 150 String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab"; 151 keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim(); 152 if (keytab.length() == 0) { 153 throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_KEYTAB); 154 } 155 String principal = defaultName + "/localhost@LOCALHOST"; 156 principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim(); 157 if (principal.length() == 0) { 158 throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_PRINCIPAL); 159 } 160 Configuration conf = new Configuration(); 161 conf.set("hadoop.security.authentication", "kerberos"); 162 UserGroupInformation.setConfiguration(conf); 163 try { 164 UserGroupInformation.loginUserFromKeytab(principal, keytab); 165 } catch (IOException ex) { 166 throw new ServiceException(FileSystemAccessException.ERROR.H02, ex.getMessage(), ex); 167 } 168 LOG.info("Using FileSystemAccess Kerberos authentication, principal [{}] keytab [{}]", principal, keytab); 169 } else if (security.equals("simple")) { 170 Configuration conf = new Configuration(); 171 conf.set("hadoop.security.authentication", "simple"); 172 UserGroupInformation.setConfiguration(conf); 173 LOG.info("Using FileSystemAccess simple/pseudo authentication, principal [{}]", System.getProperty("user.name")); 174 } else { 175 throw new ServiceException(FileSystemAccessException.ERROR.H09, security); 176 } 177 178 String hadoopConfDirProp = getServiceConfig().get(HADOOP_CONF_DIR, getServer().getConfigDir()); 179 File hadoopConfDir = new File(hadoopConfDirProp).getAbsoluteFile(); 180 if (hadoopConfDir == null) { 181 hadoopConfDir = new File(getServer().getConfigDir()).getAbsoluteFile(); 182 } 183 if (!hadoopConfDir.exists()) { 184 throw new ServiceException(FileSystemAccessException.ERROR.H10, hadoopConfDir); 185 } 186 try { 187 serviceHadoopConf = loadHadoopConf(hadoopConfDir); 188 } catch (IOException ex) { 189 throw new ServiceException(FileSystemAccessException.ERROR.H11, ex.toString(), ex); 190 } 191 192 LOG.debug("FileSystemAccess FileSystem configuration:"); 193 for (Map.Entry entry : serviceHadoopConf) { 194 LOG.debug(" {} = {}", entry.getKey(), entry.getValue()); 195 } 196 setRequiredServiceHadoopConf(serviceHadoopConf); 197 198 nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST)); 199 } 200 201 private Configuration loadHadoopConf(File dir) throws IOException { 202 Configuration hadoopConf = new Configuration(false); 203 for (String file : HADOOP_CONF_FILES) { 204 File f = new File(dir, file); 205 if (f.exists()) { 206 hadoopConf.addResource(new Path(f.getAbsolutePath())); 207 } 208 } 209 return hadoopConf; 210 } 211 212 @Override 213 public void postInit() throws ServiceException { 214 super.postInit(); 215 Instrumentation instrumentation = getServer().get(Instrumentation.class); 216 instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() { 217 @Override 218 public Integer getValue() { 219 return unmanagedFileSystems.get(); 220 } 221 }); 222 instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() { 223 @Override 224 public Long getValue() { 225 return (long) unmanagedFileSystems.get(); 226 } 227 }); 228 Scheduler scheduler = getServer().get(Scheduler.class); 229 int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60); 230 purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60); 231 purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0; 232 if (purgeTimeout > 0) { 233 scheduler.schedule(new FileSystemCachePurger(), 234 purgeInterval, purgeInterval, TimeUnit.SECONDS); 235 } 236 } 237 238 private class FileSystemCachePurger implements Runnable { 239 240 @Override 241 public void run() { 242 int count = 0; 243 for (CachedFileSystem cacheFs : fsCache.values()) { 244 try { 245 count += cacheFs.purgeIfIdle() ? 1 : 0; 246 } catch (Throwable ex) { 247 LOG.warn("Error while purging filesystem, " + ex.toString(), ex); 248 } 249 } 250 LOG.debug("Purged [{}} filesystem instances", count); 251 } 252 } 253 254 private Set<String> toLowerCase(Collection<String> collection) { 255 Set<String> set = new HashSet<String>(); 256 for (String value : collection) { 257 set.add(value.toLowerCase()); 258 } 259 return set; 260 } 261 262 @Override 263 public Class getInterface() { 264 return FileSystemAccess.class; 265 } 266 267 @Override 268 public Class[] getServiceDependencies() { 269 return new Class[]{Instrumentation.class, Scheduler.class}; 270 } 271 272 protected UserGroupInformation getUGI(String user) throws IOException { 273 return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 274 } 275 276 protected void setRequiredServiceHadoopConf(Configuration conf) { 277 conf.set("fs.hdfs.impl.disable.cache", "true"); 278 } 279 280 private static final String HTTPFS_FS_USER = "httpfs.fs.user"; 281 282 protected FileSystem createFileSystem(Configuration namenodeConf) 283 throws IOException { 284 String user = UserGroupInformation.getCurrentUser().getShortUserName(); 285 CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout); 286 CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS); 287 if (cachedFS == null) { 288 cachedFS = newCachedFS; 289 } 290 Configuration conf = new Configuration(namenodeConf); 291 conf.set(HTTPFS_FS_USER, user); 292 return cachedFS.getFileSytem(conf); 293 } 294 295 protected void closeFileSystem(FileSystem fs) throws IOException { 296 if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) { 297 fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release(); 298 } 299 } 300 301 protected void validateNamenode(String namenode) throws FileSystemAccessException { 302 if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) { 303 if (!nameNodeWhitelist.contains(namenode.toLowerCase())) { 304 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H05, namenode, "not in whitelist"); 305 } 306 } 307 } 308 309 protected void checkNameNodeHealth(FileSystem fileSystem) throws FileSystemAccessException { 310 } 311 312 @Override 313 public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor) 314 throws FileSystemAccessException { 315 Check.notEmpty(user, "user"); 316 Check.notNull(conf, "conf"); 317 Check.notNull(executor, "executor"); 318 if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) { 319 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04); 320 } 321 if (conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) == null || 322 conf.getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY).length() == 0) { 323 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H06, 324 CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY); 325 } 326 try { 327 validateNamenode( 328 new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)). 329 getAuthority()); 330 UserGroupInformation ugi = getUGI(user); 331 return ugi.doAs(new PrivilegedExceptionAction<T>() { 332 @Override 333 public T run() throws Exception { 334 FileSystem fs = createFileSystem(conf); 335 Instrumentation instrumentation = getServer().get(Instrumentation.class); 336 Instrumentation.Cron cron = instrumentation.createCron(); 337 try { 338 checkNameNodeHealth(fs); 339 cron.start(); 340 return executor.execute(fs); 341 } finally { 342 cron.stop(); 343 instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron); 344 closeFileSystem(fs); 345 } 346 } 347 }); 348 } catch (FileSystemAccessException ex) { 349 throw ex; 350 } catch (Exception ex) { 351 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H03, ex); 352 } 353 } 354 355 public FileSystem createFileSystemInternal(String user, final Configuration conf) 356 throws IOException, FileSystemAccessException { 357 Check.notEmpty(user, "user"); 358 Check.notNull(conf, "conf"); 359 if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) { 360 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04); 361 } 362 try { 363 validateNamenode( 364 new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).getAuthority()); 365 UserGroupInformation ugi = getUGI(user); 366 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 367 @Override 368 public FileSystem run() throws Exception { 369 return createFileSystem(conf); 370 } 371 }); 372 } catch (IOException ex) { 373 throw ex; 374 } catch (FileSystemAccessException ex) { 375 throw ex; 376 } catch (Exception ex) { 377 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H08, ex.getMessage(), ex); 378 } 379 } 380 381 @Override 382 public FileSystem createFileSystem(String user, final Configuration conf) throws IOException, 383 FileSystemAccessException { 384 unmanagedFileSystems.incrementAndGet(); 385 return createFileSystemInternal(user, conf); 386 } 387 388 @Override 389 public void releaseFileSystem(FileSystem fs) throws IOException { 390 unmanagedFileSystems.decrementAndGet(); 391 closeFileSystem(fs); 392 } 393 394 @Override 395 public Configuration getFileSystemConfiguration() { 396 Configuration conf = new Configuration(true); 397 ConfigurationUtils.copy(serviceHadoopConf, conf); 398 conf.setBoolean(FILE_SYSTEM_SERVICE_CREATED, true); 399 400 // Force-clear server-side umask to make HttpFS match WebHDFS behavior 401 conf.set(FsPermission.UMASK_LABEL, "000"); 402 403 return conf; 404 } 405 406}