1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.PrintStream;
22 import java.io.PrintWriter;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.util.ReflectionUtils;
36
37
38
39
40 @InterfaceAudience.Private
41 public class Threads {
42 protected static final Log LOG = LogFactory.getLog(Threads.class);
43 private static final AtomicInteger poolNumber = new AtomicInteger(1);
44
45
46
47
48
49 public static Thread setDaemonThreadRunning(final Thread t) {
50 return setDaemonThreadRunning(t, t.getName());
51 }
52
53
54
55
56
57
58
59 public static Thread setDaemonThreadRunning(final Thread t,
60 final String name) {
61 return setDaemonThreadRunning(t, name, null);
62 }
63
64
65
66
67
68
69
70
71
72 public static Thread setDaemonThreadRunning(final Thread t,
73 final String name, final UncaughtExceptionHandler handler) {
74 t.setName(name);
75 if (handler != null) {
76 t.setUncaughtExceptionHandler(handler);
77 }
78 t.setDaemon(true);
79 t.start();
80 return t;
81 }
82
83
84
85
86
87 public static void shutdown(final Thread t) {
88 shutdown(t, 0);
89 }
90
91
92
93
94
95
96 public static void shutdown(final Thread t, final long joinwait) {
97 if (t == null) return;
98 while (t.isAlive()) {
99 try {
100 t.join(joinwait);
101 } catch (InterruptedException e) {
102 LOG.warn(t.getName() + "; joinwait=" + joinwait, e);
103 }
104 }
105 }
106
107
108
109
110
111
112
113 public static void threadDumpingIsAlive(final Thread t)
114 throws InterruptedException {
115 if (t == null) {
116 return;
117 }
118
119 while (t.isAlive()) {
120 t.join(60 * 1000);
121 if (t.isAlive()) {
122 printThreadInfo(System.out,
123 "Automatic Stack Trace every 60 seconds waiting on " +
124 t.getName());
125 }
126 }
127 }
128
129
130
131
132
133 public static void sleep(long millis) {
134 try {
135 Thread.sleep(millis);
136 } catch (InterruptedException e) {
137 e.printStackTrace();
138 Thread.currentThread().interrupt();
139 }
140 }
141
142
143
144
145
146
147 public static void sleepWithoutInterrupt(final long msToWait) {
148 long timeMillis = System.currentTimeMillis();
149 long endTime = timeMillis + msToWait;
150 boolean interrupted = false;
151 while (timeMillis < endTime) {
152 try {
153 Thread.sleep(endTime - timeMillis);
154 } catch (InterruptedException ex) {
155 interrupted = true;
156 }
157 timeMillis = System.currentTimeMillis();
158 }
159
160 if (interrupted) {
161 Thread.currentThread().interrupt();
162 }
163 }
164
165
166
167
168
169
170
171
172
173
174
175
176 public static ThreadPoolExecutor getBoundedCachedThreadPool(
177 int maxCachedThread, long timeout, TimeUnit unit,
178 ThreadFactory threadFactory) {
179 ThreadPoolExecutor boundedCachedThreadPool =
180 new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
181 unit, new LinkedBlockingQueue<Runnable>(), threadFactory);
182
183 boundedCachedThreadPool.allowCoreThreadTimeOut(true);
184 return boundedCachedThreadPool;
185 }
186
187
188
189
190
191
192
193
194 public static ThreadFactory getNamedThreadFactory(final String prefix) {
195 SecurityManager s = System.getSecurityManager();
196 final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
197 .getThreadGroup();
198
199 return new ThreadFactory() {
200 final AtomicInteger threadNumber = new AtomicInteger(1);
201 private final int poolNumber = Threads.poolNumber.getAndIncrement();
202 final ThreadGroup group = threadGroup;
203
204 @Override
205 public Thread newThread(Runnable r) {
206 final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
207 return new Thread(group, r, name);
208 }
209 };
210 }
211
212
213
214
215
216 public static ThreadFactory newDaemonThreadFactory(final String prefix) {
217 return newDaemonThreadFactory(prefix, null);
218 }
219
220
221
222
223
224
225
226
227 public static ThreadFactory newDaemonThreadFactory(final String prefix,
228 final UncaughtExceptionHandler handler) {
229 final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
230 return new ThreadFactory() {
231 @Override
232 public Thread newThread(Runnable r) {
233 Thread t = namedFactory.newThread(r);
234 if (handler != null) {
235 t.setUncaughtExceptionHandler(handler);
236 }
237 if (!t.isDaemon()) {
238 t.setDaemon(true);
239 }
240 if (t.getPriority() != Thread.NORM_PRIORITY) {
241 t.setPriority(Thread.NORM_PRIORITY);
242 }
243 return t;
244 }
245
246 };
247 }
248
249 private static Method printThreadInfoMethod = null;
250 private static boolean printThreadInfoMethodWithPrintStream = true;
251
252
253
254
255
256
257
258 public static void printThreadInfo(PrintStream stream, String title) {
259
260 if (printThreadInfoMethod == null) {
261 try {
262
263 printThreadInfoMethod = ReflectionUtils.class.getMethod("printThreadInfo",
264 PrintStream.class, String.class);
265 } catch (NoSuchMethodException e) {
266
267 printThreadInfoMethodWithPrintStream = false;
268 try {
269 printThreadInfoMethod = ReflectionUtils.class.getMethod("printThreadInfo",
270 PrintWriter.class, String.class);
271 } catch (NoSuchMethodException e1) {
272 throw new RuntimeException("Cannot find method. Check hadoop jars linked", e1);
273 }
274 }
275 printThreadInfoMethod.setAccessible(true);
276 }
277
278 try {
279 if (printThreadInfoMethodWithPrintStream) {
280 printThreadInfoMethod.invoke(null, stream, title);
281 } else {
282 printThreadInfoMethod.invoke(null, new PrintWriter(stream), title);
283 }
284 } catch (IllegalAccessException e) {
285 throw new RuntimeException(e.getCause());
286 } catch (IllegalArgumentException e) {
287 throw new RuntimeException(e.getCause());
288 } catch (InvocationTargetException e) {
289 throw new RuntimeException(e.getCause());
290 }
291 }
292 }