1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.handler;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.HRegionInfo;
28 import org.apache.hadoop.hbase.HTableDescriptor;
29 import org.apache.hadoop.hbase.Server;
30 import org.apache.hadoop.hbase.executor.EventHandler;
31 import org.apache.hadoop.hbase.executor.EventType;
32 import org.apache.hadoop.hbase.master.AssignmentManager;
33 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
34 import org.apache.hadoop.hbase.regionserver.HRegion;
35 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
36 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
37 import org.apache.hadoop.hbase.util.CancelableProgressable;
38 import org.apache.hadoop.hbase.util.ConfigUtil;
39 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
40 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41 import org.apache.zookeeper.KeeperException;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class OpenRegionHandler extends EventHandler {
49 private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
50
51 protected final RegionServerServices rsServices;
52
53 private final HRegionInfo regionInfo;
54 private final HTableDescriptor htd;
55
56 private boolean tomActivated;
57 private int assignmentTimeout;
58
59
60
61
62 private volatile int version = -1;
63
64 private volatile int versionOfOfflineNode = -1;
65
66 private final boolean useZKForAssignment;
67
68 public OpenRegionHandler(final Server server,
69 final RegionServerServices rsServices, HRegionInfo regionInfo,
70 HTableDescriptor htd) {
71 this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1);
72 }
73 public OpenRegionHandler(final Server server,
74 final RegionServerServices rsServices, HRegionInfo regionInfo,
75 HTableDescriptor htd, int versionOfOfflineNode) {
76 this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION,
77 versionOfOfflineNode);
78 }
79
80 protected OpenRegionHandler(final Server server,
81 final RegionServerServices rsServices, final HRegionInfo regionInfo,
82 final HTableDescriptor htd, EventType eventType,
83 final int versionOfOfflineNode) {
84 super(server, eventType);
85 this.rsServices = rsServices;
86 this.regionInfo = regionInfo;
87 this.htd = htd;
88 this.versionOfOfflineNode = versionOfOfflineNode;
89 tomActivated = this.server.getConfiguration().
90 getBoolean(AssignmentManager.ASSIGNMENT_TIMEOUT_MANAGEMENT,
91 AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
92 assignmentTimeout = this.server.getConfiguration().
93 getInt(AssignmentManager.ASSIGNMENT_TIMEOUT,
94 AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT);
95 useZKForAssignment = ConfigUtil.useZKForAssignment(server.getConfiguration());
96 }
97
98 public HRegionInfo getRegionInfo() {
99 return regionInfo;
100 }
101
102 @Override
103 public void process() throws IOException {
104 boolean openSuccessful = false;
105 boolean transitionedToOpening = false;
106 final String regionName = regionInfo.getRegionNameAsString();
107 HRegion region = null;
108
109 try {
110 if (this.server.isStopped() || this.rsServices.isStopping()) {
111 return;
112 }
113 final String encodedName = regionInfo.getEncodedName();
114
115
116
117
118
119
120
121 if (this.rsServices.getFromOnlineRegions(encodedName) != null) {
122 LOG.error("Region " + encodedName +
123 " was already online when we started processing the opening. " +
124 "Marking this new attempt as failed");
125 return;
126 }
127
128
129
130
131 if (!isRegionStillOpening()){
132 LOG.error("Region " + encodedName + " opening cancelled");
133 return;
134 }
135
136 if (useZKForAssignment
137 && !transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) {
138 LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
139
140 return;
141 }
142 transitionedToOpening = true;
143
144
145 region = openRegion();
146 if (region == null) {
147 return;
148 }
149
150 boolean failed = true;
151 if (isRegionStillOpening() && (!useZKForAssignment || tickleOpening("post_region_open"))) {
152 if (updateMeta(region)) {
153 failed = false;
154 }
155 }
156 if (failed || this.server.isStopped() ||
157 this.rsServices.isStopping()) {
158 return;
159 }
160
161
162 if (!isRegionStillOpening() || (useZKForAssignment && !transitionToOpened(region))) {
163
164
165
166
167
168 return;
169 }
170
171
172
173
174
175
176
177
178
179
180
181 this.rsServices.addToOnlineRegions(region);
182 openSuccessful = true;
183
184
185 LOG.debug("Opened " + regionName + " on " +
186 this.server.getServerName());
187
188
189 } finally {
190
191 if (!openSuccessful) {
192 doCleanUpOnFailedOpen(region, transitionedToOpening);
193 }
194 final Boolean current = this.rsServices.getRegionsInTransitionInRS().
195 remove(this.regionInfo.getEncodedNameAsBytes());
196
197
198
199
200
201
202
203
204 if (openSuccessful) {
205 if (current == null) {
206 LOG.error("Bad state: we've just opened a region that was NOT in transition. Region="
207 + regionName);
208 } else if (Boolean.FALSE.equals(current)) {
209
210 LOG.error("Race condition: we've finished to open a region, while a close was requested "
211 + " on region=" + regionName + ". It can be a critical error, as a region that"
212 + " should be closed is now opened. Closing it now");
213 cleanupFailedOpen(region);
214 }
215 }
216 }
217 }
218
219 private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening)
220 throws IOException {
221 if (transitionedToOpening) {
222 try {
223 if (region != null) {
224 cleanupFailedOpen(region);
225 }
226 } finally {
227 if (!useZKForAssignment) {
228 rsServices.reportRegionStateTransition(TransitionCode.FAILED_OPEN, regionInfo);
229 } else {
230
231
232 tryTransitionFromOpeningToFailedOpen(regionInfo);
233 }
234 }
235 } else if (!useZKForAssignment) {
236 rsServices.reportRegionStateTransition(TransitionCode.FAILED_OPEN, regionInfo);
237 } else {
238
239
240 tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, versionOfOfflineNode);
241 }
242 }
243
244
245
246
247
248
249
250
251 boolean updateMeta(final HRegion r) {
252 if (this.server.isStopped() || this.rsServices.isStopping()) {
253 return false;
254 }
255
256
257 final AtomicBoolean signaller = new AtomicBoolean(false);
258 PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r,
259 this.server, this.rsServices, signaller);
260 t.start();
261
262
263 long timeout = assignmentTimeout * 10;
264 long now = System.currentTimeMillis();
265 long endTime = now + timeout;
266
267
268 long period = Math.max(1, assignmentTimeout/ 3);
269 long lastUpdate = now;
270 boolean tickleOpening = true;
271 while (!signaller.get() && t.isAlive() && !this.server.isStopped() &&
272 !this.rsServices.isStopping() && (endTime > now)) {
273 long elapsed = now - lastUpdate;
274 if (elapsed > period) {
275
276 lastUpdate = now;
277 if (useZKForAssignment) {
278 tickleOpening = tickleOpening("post_open_deploy");
279 }
280 }
281 synchronized (signaller) {
282 try {
283 if (!signaller.get()) signaller.wait(period);
284 } catch (InterruptedException e) {
285
286 }
287 }
288 now = System.currentTimeMillis();
289 }
290
291
292 if (t.isAlive()) {
293 if (!signaller.get()) {
294
295 LOG.debug("Interrupting thread " + t);
296 t.interrupt();
297 }
298 try {
299 t.join();
300 } catch (InterruptedException ie) {
301 LOG.warn("Interrupted joining " +
302 r.getRegionInfo().getRegionNameAsString(), ie);
303 Thread.currentThread().interrupt();
304 }
305 }
306
307
308
309
310 return ((!Thread.interrupted() && t.getException() == null) && tickleOpening);
311 }
312
313
314
315
316
317
318
319
320 static class PostOpenDeployTasksThread extends Thread {
321 private Throwable exception = null;
322 private final Server server;
323 private final RegionServerServices services;
324 private final HRegion region;
325 private final AtomicBoolean signaller;
326
327 PostOpenDeployTasksThread(final HRegion region, final Server server,
328 final RegionServerServices services, final AtomicBoolean signaller) {
329 super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName());
330 this.setDaemon(true);
331 this.server = server;
332 this.services = services;
333 this.region = region;
334 this.signaller = signaller;
335 }
336
337 public void run() {
338 try {
339 this.services.postOpenDeployTasks(this.region,
340 this.server.getCatalogTracker());
341 } catch (Throwable e) {
342 String msg =
343 "Exception running postOpenDeployTasks; region="
344 + this.region.getRegionInfo().getEncodedName();
345 this.exception = e;
346 if (e instanceof IOException && isRegionStillOpening(region.getRegionInfo(), services)) {
347 server.abort(msg, e);
348 } else {
349 LOG.warn(msg, e);
350 }
351 }
352
353 this.signaller.set(true);
354 synchronized (this.signaller) {
355 this.signaller.notify();
356 }
357 }
358
359
360
361
362 Throwable getException() {
363 return this.exception;
364 }
365 }
366
367
368
369
370
371
372
373 boolean transitionToOpened(final HRegion r) throws IOException {
374 boolean result = false;
375 HRegionInfo hri = r.getRegionInfo();
376 final String name = hri.getRegionNameAsString();
377
378 try {
379 if (ZKAssign.transitionNodeOpened(this.server.getZooKeeper(), hri,
380 this.server.getServerName(), this.version) == -1) {
381 String warnMsg = "Completed the OPEN of region " + name +
382 " but when transitioning from " + " OPENING to OPENED ";
383 try {
384 String node = ZKAssign.getNodeName(this.server.getZooKeeper(), hri.getEncodedName());
385 if (ZKUtil.checkExists(this.server.getZooKeeper(), node) < 0) {
386
387 rsServices.abort(warnMsg + "the znode disappeared", null);
388 } else {
389 LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " +
390 "so now unassigning -- closing region on server: " + this.server.getServerName());
391 }
392 } catch (KeeperException ke) {
393 rsServices.abort(warnMsg, ke);
394 }
395 } else {
396 LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() +
397 " to OPENED in zk on " + this.server.getServerName());
398 result = true;
399 }
400 } catch (KeeperException e) {
401 LOG.error("Failed transitioning node " + name +
402 " from OPENING to OPENED -- closing region", e);
403 }
404 return result;
405 }
406
407
408
409
410
411
412 private boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri) {
413 boolean result = false;
414 final String name = hri.getRegionNameAsString();
415 try {
416 LOG.info("Opening of region " + hri + " failed, transitioning" +
417 " from OPENING to FAILED_OPEN in ZK, expecting version " + this.version);
418 if (ZKAssign.transitionNode(
419 this.server.getZooKeeper(), hri,
420 this.server.getServerName(),
421 EventType.RS_ZK_REGION_OPENING,
422 EventType.RS_ZK_REGION_FAILED_OPEN,
423 this.version) == -1) {
424 LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
425 "It's likely that the master already timed out this open " +
426 "attempt, and thus another RS already has the region.");
427 } else {
428 result = true;
429 }
430 } catch (KeeperException e) {
431 LOG.error("Failed transitioning node " + name +
432 " from OPENING to FAILED_OPEN", e);
433 }
434 return result;
435 }
436
437
438
439
440
441
442
443
444
445
446
447
448 public static boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
449 final HRegionInfo hri, final int versionOfOfflineNode) {
450 boolean result = false;
451 final String name = hri.getRegionNameAsString();
452 try {
453 LOG.info("Opening of region " + hri + " failed, transitioning" +
454 " from OFFLINE to FAILED_OPEN in ZK, expecting version " + versionOfOfflineNode);
455 if (ZKAssign.transitionNode(
456 rsServices.getZooKeeper(), hri,
457 rsServices.getServerName(),
458 EventType.M_ZK_REGION_OFFLINE,
459 EventType.RS_ZK_REGION_FAILED_OPEN,
460 versionOfOfflineNode) == -1) {
461 LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
462 "It's likely that the master already timed out this open " +
463 "attempt, and thus another RS already has the region.");
464 } else {
465 result = true;
466 }
467 } catch (KeeperException e) {
468 LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
469 }
470 return result;
471 }
472
473
474
475
476
477 HRegion openRegion() {
478 HRegion region = null;
479 try {
480
481
482 region = HRegion.openHRegion(this.regionInfo, this.htd,
483 this.rsServices.getWAL(this.regionInfo),
484 this.server.getConfiguration(),
485 this.rsServices,
486 new CancelableProgressable() {
487 public boolean progress() {
488 if (useZKForAssignment) {
489
490
491
492
493 return tickleOpening("open_region_progress");
494 }
495 if (!isRegionStillOpening()) {
496 LOG.warn("Open region aborted since it isn't opening any more");
497 return false;
498 }
499 return true;
500 }
501 });
502 } catch (Throwable t) {
503
504
505
506 LOG.error(
507 "Failed open of region=" + this.regionInfo.getRegionNameAsString()
508 + ", starting to roll back the global memstore size.", t);
509
510 if (this.rsServices != null) {
511 RegionServerAccounting rsAccounting =
512 this.rsServices.getRegionServerAccounting();
513 if (rsAccounting != null) {
514 rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName());
515 }
516 }
517 }
518 return region;
519 }
520
521 void cleanupFailedOpen(final HRegion region) throws IOException {
522 if (region != null) {
523 byte[] encodedName = regionInfo.getEncodedNameAsBytes();
524 try {
525 rsServices.getRegionsInTransitionInRS().put(encodedName,Boolean.FALSE);
526 this.rsServices.removeFromOnlineRegions(region, null);
527 region.close();
528 } finally {
529 rsServices.getRegionsInTransitionInRS().remove(encodedName);
530 }
531 }
532 }
533
534 private static boolean isRegionStillOpening(HRegionInfo regionInfo,
535 RegionServerServices rsServices) {
536 byte[] encodedName = regionInfo.getEncodedNameAsBytes();
537 Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
538 return Boolean.TRUE.equals(action);
539 }
540
541 private boolean isRegionStillOpening() {
542 return isRegionStillOpening(regionInfo, rsServices);
543 }
544
545
546
547
548
549
550
551
552
553 boolean transitionZookeeperOfflineToOpening(final String encodedName,
554 int versionOfOfflineNode) {
555
556 try {
557
558 this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo,
559 server.getServerName(), EventType.M_ZK_REGION_OFFLINE,
560 EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode);
561 } catch (KeeperException e) {
562 LOG.error("Error transition from OFFLINE to OPENING for region=" +
563 encodedName, e);
564 this.version = -1;
565 return false;
566 }
567 boolean b = isGoodVersion();
568 if (!b) {
569 LOG.warn("Failed transition from OFFLINE to OPENING for region=" +
570 encodedName);
571 }
572 return b;
573 }
574
575
576
577
578
579
580
581 boolean tickleOpening(final String context) {
582 if (!isRegionStillOpening()) {
583 LOG.warn("Open region aborted since it isn't opening any more");
584 return false;
585 }
586
587 if (!isGoodVersion()) return false;
588 String encodedName = this.regionInfo.getEncodedName();
589 try {
590 this.version =
591 ZKAssign.retransitionNodeOpening(server.getZooKeeper(),
592 this.regionInfo, this.server.getServerName(), this.version, tomActivated);
593 } catch (KeeperException e) {
594 server.abort("Exception refreshing OPENING; region=" + encodedName +
595 ", context=" + context, e);
596 this.version = -1;
597 return false;
598 }
599 boolean b = isGoodVersion();
600 if (!b) {
601 LOG.warn("Failed refreshing OPENING; region=" + encodedName +
602 ", context=" + context);
603 }
604 return b;
605 }
606
607 private boolean isGoodVersion() {
608 return this.version != -1;
609 }
610 }