1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.SynchronousQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.DaemonThreadFactory;
34 import org.apache.hadoop.hbase.errorhandling.ForeignException;
35
36 import com.google.common.collect.MapMaker;
37
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class ProcedureMember implements Closeable {
49 private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
50
51 final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
52
53 private final SubprocedureFactory builder;
54 private final ProcedureMemberRpcs rpcs;
55
56 private final ConcurrentMap<String,Subprocedure> subprocs =
57 new MapMaker().concurrencyLevel(4).weakValues().makeMap();
58 private final ExecutorService pool;
59
60
61
62
63
64
65
66
67 public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
68 SubprocedureFactory factory) {
69 this.pool = pool;
70 this.rpcs = rpcs;
71 this.builder = factory;
72 }
73
74
75
76
77
78
79
80 public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
81 return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
82 }
83
84
85
86
87
88
89
90
91 public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
92 long keepAliveMillis) {
93 return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
94 new SynchronousQueue<Runnable>(),
95 new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
96 }
97
98
99
100
101
102
103 ProcedureMemberRpcs getRpcs() {
104 return rpcs;
105 }
106
107
108
109
110
111
112
113
114
115
116 public Subprocedure createSubprocedure(String opName, byte[] data) {
117 return builder.buildSubprocedure(opName, data);
118 }
119
120
121
122
123
124
125
126
127 public boolean submitSubprocedure(Subprocedure subproc) {
128
129 if (subproc == null) {
130 LOG.warn("Submitted null subprocedure, nothing to run here.");
131 return false;
132 }
133
134 String procName = subproc.getName();
135 if (procName == null || procName.length() == 0) {
136 LOG.error("Subproc name cannot be null or the empty string");
137 return false;
138 }
139
140
141 Subprocedure rsub = subprocs.get(procName);
142 if (rsub != null) {
143 if (!rsub.isComplete()) {
144 LOG.error("Subproc '" + procName + "' is already running. Bailing out");
145 return false;
146 }
147 LOG.warn("A completed old subproc " + procName + " is still present, removing");
148 if (!subprocs.remove(procName, rsub)) {
149 LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
150 return false;
151 }
152 }
153
154 LOG.debug("Submitting new Subprocedure:" + procName);
155
156
157 try {
158 if (subprocs.putIfAbsent(procName, subproc) == null) {
159 this.pool.submit(subproc);
160 return true;
161 } else {
162 LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
163 return false;
164 }
165 } catch (RejectedExecutionException e) {
166 subprocs.remove(procName, subproc);
167
168
169 String msg = "Subprocedure pool is full!";
170 subproc.cancel(msg, e.getCause());
171 }
172
173 LOG.error("Failed to start subprocedure '" + procName + "'");
174 return false;
175 }
176
177
178
179
180
181 public void receivedReachedGlobalBarrier(String procName) {
182 Subprocedure subproc = subprocs.get(procName);
183 if (subproc == null) {
184 LOG.warn("Unexpected reached glabal barrier message for Sub-Procedure '" + procName + "'");
185 return;
186 }
187 subproc.receiveReachedGlobalBarrier();
188 }
189
190
191
192
193 @Override
194 public void close() throws IOException {
195
196 pool.shutdownNow();
197 }
198
199
200
201
202
203
204
205 boolean closeAndWait(long timeoutMs) throws InterruptedException {
206 pool.shutdown();
207 return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
208 }
209
210
211
212
213
214
215
216
217
218
219
220 public void controllerConnectionFailure(final String message, final IOException cause) {
221 Collection<Subprocedure> toNotify = subprocs.values();
222 LOG.error(message, cause);
223 for (Subprocedure sub : toNotify) {
224
225 sub.cancel(message, cause);
226 }
227 }
228
229
230
231
232
233
234 public void receiveAbortProcedure(String procName, ForeignException ee) {
235 LOG.debug("Request received to abort procedure " + procName, ee);
236
237 Subprocedure sub = subprocs.get(procName);
238 if (sub == null) {
239 LOG.info("Received abort on procedure with no local subprocedure " + procName +
240 ", ignoring it.", ee);
241 return;
242 }
243 String msg = "Propagating foreign exception to subprocedure " + sub.getName();
244 LOG.error(msg, ee);
245 sub.cancel(msg, ee);
246 }
247 }