1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.locks.Lock;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Chore;
29 import org.apache.hadoop.hbase.Stoppable;
30 import org.apache.hadoop.hbase.client.HBaseAdmin;
31 import org.apache.hadoop.hbase.client.HConnection;
32 import org.apache.hadoop.hbase.client.HConnectionManager;
33 import org.apache.hadoop.hbase.client.HTableInterface;
34 import org.apache.hadoop.hbase.security.User;
35 import org.apache.hadoop.hbase.security.UserProvider;
36 import org.apache.hadoop.security.UserGroupInformation;
37 import org.apache.log4j.Logger;
38
39
40
41
42
43
44
45 @InterfaceAudience.Private
46 public class ConnectionCache {
47 private static Logger LOG = Logger.getLogger(ConnectionCache.class);
48
49 private final Map<String, ConnectionInfo>
50 connections = new ConcurrentHashMap<String, ConnectionInfo>();
51 private final KeyLocker<String> locker = new KeyLocker<String>();
52 private final String realUserName;
53 private final UserGroupInformation realUser;
54 private final UserProvider userProvider;
55 private final Configuration conf;
56
57 private final ThreadLocal<String> effectiveUserNames =
58 new ThreadLocal<String>() {
59 protected String initialValue() {
60 return realUserName;
61 }
62 };
63
64 public ConnectionCache(final Configuration conf,
65 final UserProvider userProvider,
66 final int cleanInterval, final int maxIdleTime) throws IOException {
67 Stoppable stoppable = new Stoppable() {
68 private volatile boolean isStopped = false;
69 @Override public void stop(String why) { isStopped = true;}
70 @Override public boolean isStopped() {return isStopped;}
71 };
72
73 Chore cleaner = new Chore("ConnectionCleaner", cleanInterval, stoppable) {
74 @Override
75 protected void chore() {
76 for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
77 ConnectionInfo connInfo = entry.getValue();
78 if (connInfo.timedOut(maxIdleTime)) {
79 if (connInfo.admin != null) {
80 try {
81 connInfo.admin.close();
82 } catch (Throwable t) {
83 LOG.info("Got exception in closing idle admin", t);
84 }
85 }
86 try {
87 connInfo.connection.close();
88 } catch (Throwable t) {
89 LOG.info("Got exception in closing idle connection", t);
90 }
91 }
92 }
93 }
94 };
95
96 Threads.setDaemonThreadRunning(cleaner.getThread());
97 this.realUser = userProvider.getCurrent().getUGI();
98 this.realUserName = realUser.getShortUserName();
99 this.userProvider = userProvider;
100 this.conf = conf;
101 }
102
103
104
105
106 public void setEffectiveUser(String user) {
107 effectiveUserNames.set(user);
108 }
109
110
111
112
113 public String getEffectiveUser() {
114 return effectiveUserNames.get();
115 }
116
117
118
119
120
121 @SuppressWarnings("deprecation")
122 public HBaseAdmin getAdmin() throws IOException {
123 ConnectionInfo connInfo = getCurrentConnection();
124 if (connInfo.admin == null) {
125 Lock lock = locker.acquireLock(getEffectiveUser());
126 try {
127 if (connInfo.admin == null) {
128 connInfo.admin = new HBaseAdmin(connInfo.connection);
129 }
130 } finally {
131 lock.unlock();
132 }
133 }
134 return connInfo.admin;
135 }
136
137
138
139
140 public HTableInterface getTable(String tableName) throws IOException {
141 ConnectionInfo connInfo = getCurrentConnection();
142 return connInfo.connection.getTable(tableName);
143 }
144
145
146
147
148
149 ConnectionInfo getCurrentConnection() throws IOException {
150 String userName = getEffectiveUser();
151 ConnectionInfo connInfo = connections.get(userName);
152 if (connInfo == null || !connInfo.updateAccessTime()) {
153 Lock lock = locker.acquireLock(userName);
154 try {
155 connInfo = connections.get(userName);
156 if (connInfo == null) {
157 UserGroupInformation ugi = realUser;
158 if (!userName.equals(realUserName)) {
159 ugi = UserGroupInformation.createProxyUser(userName, realUser);
160 }
161 User user = userProvider.create(ugi);
162 HConnection conn = HConnectionManager.createConnection(conf, user);
163 connInfo = new ConnectionInfo(conn, userName);
164 connections.put(userName, connInfo);
165 }
166 } finally {
167 lock.unlock();
168 }
169 }
170 return connInfo;
171 }
172
173 class ConnectionInfo {
174 final HConnection connection;
175 final String userName;
176
177 volatile HBaseAdmin admin;
178 private long lastAccessTime;
179 private boolean closed;
180
181 ConnectionInfo(HConnection conn, String user) {
182 lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
183 connection = conn;
184 closed = false;
185 userName = user;
186 }
187
188 synchronized boolean updateAccessTime() {
189 if (closed) {
190 return false;
191 }
192 if (connection.isAborted() || connection.isClosed()) {
193 LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
194 connections.remove(userName);
195 return false;
196 }
197 lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
198 return true;
199 }
200
201 synchronized boolean timedOut(int maxIdleTime) {
202 long timeoutTime = lastAccessTime + maxIdleTime;
203 if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) {
204 connections.remove(userName);
205 closed = true;
206 }
207 return false;
208 }
209 }
210 }