1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.NavigableMap;
32 import java.util.Set;
33 import java.util.TreeMap;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentSkipListSet;
36 import java.util.concurrent.CopyOnWriteArrayList;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.locks.Lock;
42 import java.util.concurrent.locks.ReentrantLock;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.hbase.classification.InterfaceAudience;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.Chore;
51 import org.apache.hadoop.hbase.HBaseIOException;
52 import org.apache.hadoop.hbase.HConstants;
53 import org.apache.hadoop.hbase.HRegionInfo;
54 import org.apache.hadoop.hbase.NotServingRegionException;
55 import org.apache.hadoop.hbase.RegionTransition;
56 import org.apache.hadoop.hbase.Server;
57 import org.apache.hadoop.hbase.ServerName;
58 import org.apache.hadoop.hbase.Stoppable;
59 import org.apache.hadoop.hbase.TableName;
60 import org.apache.hadoop.hbase.TableNotFoundException;
61 import org.apache.hadoop.hbase.catalog.CatalogTracker;
62 import org.apache.hadoop.hbase.catalog.MetaReader;
63 import org.apache.hadoop.hbase.client.Result;
64 import org.apache.hadoop.hbase.exceptions.DeserializationException;
65 import org.apache.hadoop.hbase.executor.EventHandler;
66 import org.apache.hadoop.hbase.executor.EventType;
67 import org.apache.hadoop.hbase.executor.ExecutorService;
68 import org.apache.hadoop.hbase.ipc.RpcClient;
69 import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
70 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
71 import org.apache.hadoop.hbase.master.RegionState.State;
72 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
73 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
74 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
75 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
76 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
77 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
78 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
79 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
80 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
81 import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
82 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
83 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
84 import org.apache.hadoop.hbase.regionserver.SplitTransaction;
85 import org.apache.hadoop.hbase.regionserver.wal.HLog;
86 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
87 import org.apache.hadoop.hbase.util.ConfigUtil;
88 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
89 import org.apache.hadoop.hbase.util.FSUtils;
90 import org.apache.hadoop.hbase.util.KeyLocker;
91 import org.apache.hadoop.hbase.util.Pair;
92 import org.apache.hadoop.hbase.util.PairOfSameType;
93 import org.apache.hadoop.hbase.util.Threads;
94 import org.apache.hadoop.hbase.util.Triple;
95 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
96 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
97 import org.apache.hadoop.hbase.zookeeper.ZKTable;
98 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
99 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
100 import org.apache.hadoop.ipc.RemoteException;
101 import org.apache.zookeeper.AsyncCallback;
102 import org.apache.zookeeper.KeeperException;
103 import org.apache.zookeeper.KeeperException.NoNodeException;
104 import org.apache.zookeeper.KeeperException.NodeExistsException;
105 import org.apache.zookeeper.data.Stat;
106
107 import com.google.common.annotations.VisibleForTesting;
108 import com.google.common.base.Preconditions;
109 import com.google.common.collect.LinkedHashMultimap;
110
111
112
113
114
115
116
117
118 @InterfaceAudience.Private
119 public class AssignmentManager extends ZooKeeperListener {
120 private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
121
122 public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
123 -1, -1L);
124
125 public static final String ASSIGNMENT_TIMEOUT = "hbase.master.assignment.timeoutmonitor.timeout";
126 public static final int DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT = 600000;
127 public static final String ASSIGNMENT_TIMEOUT_MANAGEMENT = "hbase.assignment.timeout.management";
128 public static final boolean DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT = false;
129
130 public static final String ALREADY_IN_TRANSITION_WAITTIME
131 = "hbase.assignment.already.intransition.waittime";
132 public static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000;
133
134 protected final Server server;
135
136 private ServerManager serverManager;
137
138 private boolean shouldAssignRegionsWithFavoredNodes;
139
140 private CatalogTracker catalogTracker;
141
142 protected final TimeoutMonitor timeoutMonitor;
143
144 private final TimerUpdater timerUpdater;
145
146 private LoadBalancer balancer;
147
148 private final MetricsAssignmentManager metricsAssignmentManager;
149
150 private final TableLockManager tableLockManager;
151
152 private AtomicInteger numRegionsOpened = new AtomicInteger(0);
153
154 final private KeyLocker<String> locker = new KeyLocker<String>();
155
156
157
158
159
160 private final Map <String, HRegionInfo> regionsToReopen;
161
162
163
164
165
166 private final int maximumAttempts;
167
168
169
170
171 private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
172 = new HashMap<String, PairOfSameType<HRegionInfo>>();
173
174 private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
175 = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
176
177
178
179
180
181 private final long sleepTimeBeforeRetryingMetaAssignment;
182
183
184
185
186
187 final NavigableMap<String, RegionPlan> regionPlans =
188 new TreeMap<String, RegionPlan>();
189
190 private final ZKTable zkTable;
191
192
193
194
195
196 private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer;
197
198 private final ExecutorService executorService;
199
200
201 private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
202
203
204 private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
205
206
207 private java.util.concurrent.ExecutorService threadPoolExecutorService;
208
209
210 private final java.util.concurrent.ExecutorService zkEventWorkers;
211
212 private List<EventType> ignoreStatesRSOffline = Arrays.asList(
213 EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
214
215 private final RegionStates regionStates;
216
217
218
219
220
221 private final int bulkAssignThresholdRegions;
222 private final int bulkAssignThresholdServers;
223
224
225
226
227 private final boolean bulkAssignWaitTillAllAssigned;
228
229
230
231
232
233
234
235
236
237 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
238
239
240 private final boolean tomActivated;
241
242
243
244
245
246
247
248
249 private final ConcurrentHashMap<String, AtomicInteger>
250 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
251
252
253 private final boolean useZKForAssignment;
254
255
256
257 private final RegionStateStore regionStateStore;
258
259
260
261
262 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
263 public static boolean TEST_SKIP_SPLIT_HANDLING = false;
264
265
266 private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
267
268
269
270
271
272
273
274
275
276
277
278 public AssignmentManager(Server server, ServerManager serverManager,
279 CatalogTracker catalogTracker, final LoadBalancer balancer,
280 final ExecutorService service, MetricsMaster metricsMaster,
281 final TableLockManager tableLockManager) throws KeeperException, IOException {
282 super(server.getZooKeeper());
283 this.server = server;
284 this.serverManager = serverManager;
285 this.catalogTracker = catalogTracker;
286 this.executorService = service;
287 this.regionStateStore = new RegionStateStore(server);
288 this.regionsToReopen = Collections.synchronizedMap
289 (new HashMap<String, HRegionInfo> ());
290 Configuration conf = server.getConfiguration();
291
292 this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
293 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
294 FavoredNodeLoadBalancer.class);
295 this.tomActivated = conf.getBoolean(
296 ASSIGNMENT_TIMEOUT_MANAGEMENT, DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
297 if (tomActivated){
298 this.serversInUpdatingTimer = new ConcurrentSkipListSet<ServerName>();
299 this.timeoutMonitor = new TimeoutMonitor(
300 conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
301 server, serverManager,
302 conf.getInt(ASSIGNMENT_TIMEOUT, DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT));
303 this.timerUpdater = new TimerUpdater(conf.getInt(
304 "hbase.master.assignment.timerupdater.period", 10000), server);
305 Threads.setDaemonThreadRunning(timerUpdater.getThread(),
306 server.getServerName() + ".timerUpdater");
307 } else {
308 this.serversInUpdatingTimer = null;
309 this.timeoutMonitor = null;
310 this.timerUpdater = null;
311 }
312 this.zkTable = new ZKTable(this.watcher);
313
314 this.maximumAttempts = Math.max(1,
315 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
316 this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
317 "hbase.meta.assignment.retry.sleeptime", 1000l);
318 this.balancer = balancer;
319 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
320 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
321 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
322 this.regionStates = new RegionStates(server, serverManager, regionStateStore);
323
324 this.bulkAssignWaitTillAllAssigned =
325 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
326 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
327 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
328
329 int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
330 ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
331 zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
332 TimeUnit.SECONDS, threadFactory);
333 this.tableLockManager = tableLockManager;
334
335 this.metricsAssignmentManager = new MetricsAssignmentManager();
336 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
337 }
338
339 void startTimeOutMonitor() {
340 if (tomActivated) {
341 Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
342 + ".timeoutMonitor");
343 }
344 }
345
346
347
348
349
350 public void registerListener(final AssignmentListener listener) {
351 this.listeners.add(listener);
352 }
353
354
355
356
357
358 public boolean unregisterListener(final AssignmentListener listener) {
359 return this.listeners.remove(listener);
360 }
361
362
363
364
365 public ZKTable getZKTable() {
366
367
368 return this.zkTable;
369 }
370
371
372
373
374
375
376
377 public RegionStates getRegionStates() {
378 return regionStates;
379 }
380
381
382
383
384 @VisibleForTesting
385 RegionStateStore getRegionStateStore() {
386 return regionStateStore;
387 }
388
389 public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
390 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
391 }
392
393
394
395
396
397
398 public void addPlan(String encodedName, RegionPlan plan) {
399 synchronized (regionPlans) {
400 regionPlans.put(encodedName, plan);
401 }
402 }
403
404
405
406
407 public void addPlans(Map<String, RegionPlan> plans) {
408 synchronized (regionPlans) {
409 regionPlans.putAll(plans);
410 }
411 }
412
413
414
415
416
417
418
419
420 public void setRegionsToReopen(List <HRegionInfo> regions) {
421 for(HRegionInfo hri : regions) {
422 regionsToReopen.put(hri.getEncodedName(), hri);
423 }
424 }
425
426
427
428
429
430
431
432
433 public Pair<Integer, Integer> getReopenStatus(TableName tableName)
434 throws IOException {
435 List <HRegionInfo> hris =
436 MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true);
437 Integer pending = 0;
438 for (HRegionInfo hri : hris) {
439 String name = hri.getEncodedName();
440
441 if (regionsToReopen.containsKey(name)
442 || regionStates.isRegionInTransition(name)) {
443 pending++;
444 }
445 }
446 return new Pair<Integer, Integer>(pending, hris.size());
447 }
448
449
450
451
452
453
454 public boolean isFailoverCleanupDone() {
455 return failoverCleanupDone.get();
456 }
457
458
459
460
461
462 public Lock acquireRegionLock(final String encodedName) {
463 return locker.acquireLock(encodedName);
464 }
465
466
467
468
469
470 void failoverCleanupDone() {
471 failoverCleanupDone.set(true);
472 serverManager.processQueuedDeadServers();
473 }
474
475
476
477
478
479
480
481
482 void joinCluster() throws IOException,
483 KeeperException, InterruptedException {
484 long startTime = System.currentTimeMillis();
485
486
487
488
489
490
491
492
493
494
495 Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
496
497
498
499
500 boolean failover = processDeadServersAndRegionsInTransition(deadServers);
501
502 if (!useZKForAssignment) {
503
504 ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
505 }
506 recoverTableInDisablingState();
507 recoverTableInEnablingState();
508 LOG.info("Joined the cluster in " + (System.currentTimeMillis()
509 - startTime) + "ms, failover=" + failover);
510 }
511
512
513
514
515
516
517
518
519
520
521
522
523 boolean processDeadServersAndRegionsInTransition(
524 final Map<ServerName, List<HRegionInfo>> deadServers)
525 throws KeeperException, IOException, InterruptedException {
526 List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
527 watcher.assignmentZNode);
528
529 if (nodes == null && useZKForAssignment) {
530 String errorMessage = "Failed to get the children from ZK";
531 server.abort(errorMessage, new IOException(errorMessage));
532 return true;
533 }
534
535 boolean failover = !serverManager.getDeadServers().isEmpty();
536 if (failover) {
537
538 if (LOG.isDebugEnabled()) {
539 LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
540 }
541 } else {
542
543 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
544 for (Map.Entry<HRegionInfo, ServerName> en : regionStates.getRegionAssignments().entrySet()) {
545 HRegionInfo hri = en.getKey();
546 if (!hri.isMetaTable() && onlineServers.contains(en.getValue())) {
547 LOG.debug("Found " + hri + " out on cluster");
548 failover = true;
549 break;
550 }
551 }
552 }
553
554 if (!failover && nodes != null) {
555
556 for (String encodedName : nodes) {
557 RegionState regionState = regionStates.getRegionState(encodedName);
558 if (regionState != null && !regionState.getRegion().isMetaRegion()) {
559 LOG.debug("Found " + regionState + " in RITs");
560 failover = true;
561 break;
562 }
563 }
564 }
565
566 if (!failover && !useZKForAssignment) {
567
568 Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
569 if (!regionsInTransition.isEmpty()) {
570 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
571 for (RegionState regionState : regionsInTransition.values()) {
572 ServerName serverName = regionState.getServerName();
573 if (!regionState.getRegion().isMetaRegion()
574 && serverName != null && onlineServers.contains(serverName)) {
575 LOG.debug("Found " + regionState + " in RITs");
576 failover = true;
577 break;
578 }
579 }
580 }
581 }
582
583 if (!failover) {
584
585
586
587
588 Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
589 if (!queuedDeadServers.isEmpty()) {
590 Configuration conf = server.getConfiguration();
591 Path rootdir = FSUtils.getRootDir(conf);
592 FileSystem fs = rootdir.getFileSystem(conf);
593 for (ServerName serverName : queuedDeadServers) {
594 Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
595 Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
596 if (fs.exists(logDir) || fs.exists(splitDir)) {
597 LOG.debug("Found queued dead server " + serverName);
598 failover = true;
599 break;
600 }
601 }
602 if (!failover) {
603
604
605 LOG.info("AM figured that it's not a failover and cleaned up " + queuedDeadServers.size()
606 + " queued dead servers");
607 serverManager.removeRequeuedDeadServers();
608 }
609 }
610 }
611
612 Set<TableName> disabledOrDisablingOrEnabling = null;
613 if (!failover) {
614 disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
615 disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
616
617
618 regionStates.closeAllUserRegions(disabledOrDisablingOrEnabling);
619 }
620
621
622 regionStateStore.start();
623
624
625 if (failover) {
626 LOG.info("Found regions out on cluster or in RIT; presuming failover");
627
628
629 processDeadServersAndRecoverLostRegions(deadServers);
630 }
631 if (!failover && useZKForAssignment) {
632
633 ZKAssign.deleteAllNodes(watcher);
634 ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, this.watcher.assignmentZNode);
635 }
636
637
638
639
640 failoverCleanupDone();
641 if (!failover) {
642
643 LOG.info("Clean cluster startup. Assigning user regions");
644 assignAllUserRegions(disabledOrDisablingOrEnabling);
645 }
646 return failover;
647 }
648
649
650
651
652
653
654
655
656
657
658
659
660 boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
661 throws InterruptedException, KeeperException, IOException {
662 String encodedRegionName = hri.getEncodedName();
663 if (!processRegionInTransition(encodedRegionName, hri)) {
664 return false;
665 }
666 LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
667 while (!this.server.isStopped() &&
668 this.regionStates.isRegionInTransition(encodedRegionName)) {
669 RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
670 if (state == null || !serverManager.isServerOnline(state.getServerName())) {
671
672
673
674 break;
675 }
676 this.regionStates.waitForUpdate(100);
677 }
678 return true;
679 }
680
681
682
683
684
685
686
687
688
689
690 boolean processRegionInTransition(final String encodedRegionName,
691 final HRegionInfo regionInfo) throws KeeperException, IOException {
692
693
694
695
696 Lock lock = locker.acquireLock(encodedRegionName);
697 try {
698 Stat stat = new Stat();
699 byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
700 if (data == null) return false;
701 RegionTransition rt;
702 try {
703 rt = RegionTransition.parseFrom(data);
704 } catch (DeserializationException e) {
705 LOG.warn("Failed parse znode data", e);
706 return false;
707 }
708 HRegionInfo hri = regionInfo;
709 if (hri == null) {
710
711
712
713
714
715 hri = regionStates.getRegionInfo(rt.getRegionName());
716 EventType et = rt.getEventType();
717 if (hri == null && et != EventType.RS_ZK_REGION_MERGING
718 && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
719 LOG.warn("Couldn't find the region in recovering " + rt);
720 return false;
721 }
722 }
723 return processRegionsInTransition(
724 rt, hri, stat.getVersion());
725 } finally {
726 lock.unlock();
727 }
728 }
729
730
731
732
733
734
735
736
737
738 boolean processRegionsInTransition(
739 final RegionTransition rt, final HRegionInfo regionInfo,
740 final int expectedVersion) throws KeeperException {
741 EventType et = rt.getEventType();
742
743 final ServerName sn = rt.getServerName();
744 final byte[] regionName = rt.getRegionName();
745 final String encodedName = HRegionInfo.encodeRegionName(regionName);
746 final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
747 LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
748
749 if (regionStates.isRegionInTransition(encodedName)
750 && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
751 LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
752 + et + ", does nothing since the region is already in transition "
753 + regionStates.getRegionTransitionState(encodedName));
754
755 return true;
756 }
757 if (!serverManager.isServerOnline(sn)) {
758
759
760
761 LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
762 " was on deadserver; forcing offline");
763 if (regionStates.isRegionOnline(regionInfo)) {
764
765
766
767 regionStates.regionOffline(regionInfo);
768 sendRegionClosedNotification(regionInfo);
769 }
770
771 regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
772
773 if (regionInfo.isMetaRegion()) {
774
775
776 MetaRegionTracker.setMetaLocation(watcher, sn, State.OPEN);
777 } else {
778
779
780 regionStates.setLastRegionServerOfRegion(sn, encodedName);
781
782 if (!serverManager.isServerDead(sn)) {
783 serverManager.expireServer(sn);
784 }
785 }
786 return false;
787 }
788 switch (et) {
789 case M_ZK_REGION_CLOSING:
790
791
792 final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
793 this.executorService.submit(
794 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
795 @Override
796 public void process() throws IOException {
797 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
798 try {
799 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
800 if (regionStates.isRegionOffline(regionInfo)) {
801 assign(regionInfo, true);
802 }
803 } finally {
804 lock.unlock();
805 }
806 }
807 });
808 break;
809
810 case RS_ZK_REGION_CLOSED:
811 case RS_ZK_REGION_FAILED_OPEN:
812
813 regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
814 invokeAssign(regionInfo);
815 break;
816
817 case M_ZK_REGION_OFFLINE:
818
819 regionStates.updateRegionState(rt, State.PENDING_OPEN);
820 final RegionState rsOffline = regionStates.getRegionState(regionInfo);
821 this.executorService.submit(
822 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
823 @Override
824 public void process() throws IOException {
825 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
826 try {
827 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
828 addPlan(encodedName, plan);
829 assign(rsOffline, false, false);
830 } finally {
831 lock.unlock();
832 }
833 }
834 });
835 break;
836
837 case RS_ZK_REGION_OPENING:
838 regionStates.updateRegionState(rt, State.OPENING);
839 break;
840
841 case RS_ZK_REGION_OPENED:
842
843
844
845 regionStates.updateRegionState(rt, State.OPEN);
846 new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
847 break;
848 case RS_ZK_REQUEST_REGION_SPLIT:
849 case RS_ZK_REGION_SPLITTING:
850 case RS_ZK_REGION_SPLIT:
851
852
853
854 regionStates.regionOnline(regionInfo, sn);
855 regionStates.updateRegionState(rt, State.SPLITTING);
856 if (!handleRegionSplitting(
857 rt, encodedName, prettyPrintedRegionName, sn)) {
858 deleteSplittingNode(encodedName, sn);
859 }
860 break;
861 case RS_ZK_REQUEST_REGION_MERGE:
862 case RS_ZK_REGION_MERGING:
863 case RS_ZK_REGION_MERGED:
864 if (!handleRegionMerging(
865 rt, encodedName, prettyPrintedRegionName, sn)) {
866 deleteMergingNode(encodedName, sn);
867 }
868 break;
869 default:
870 throw new IllegalStateException("Received region in state:" + et + " is not valid.");
871 }
872 LOG.info("Processed region " + prettyPrintedRegionName + " in state "
873 + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
874 + "server: " + sn);
875 return true;
876 }
877
878
879
880
881
882 public void removeClosedRegion(HRegionInfo hri) {
883 if (regionsToReopen.remove(hri.getEncodedName()) != null) {
884 LOG.debug("Removed region from reopening regions because it was closed");
885 }
886 }
887
888
889
890
891
892
893
894
895
896
897
898 void handleRegion(final RegionTransition rt, int expectedVersion) {
899 if (rt == null) {
900 LOG.warn("Unexpected NULL input for RegionTransition rt");
901 return;
902 }
903 final ServerName sn = rt.getServerName();
904
905 if (sn.equals(HBCK_CODE_SERVERNAME)) {
906 handleHBCK(rt);
907 return;
908 }
909 final long createTime = rt.getCreateTime();
910 final byte[] regionName = rt.getRegionName();
911 String encodedName = HRegionInfo.encodeRegionName(regionName);
912 String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
913
914 if (!serverManager.isServerOnline(sn)
915 && !ignoreStatesRSOffline.contains(rt.getEventType())) {
916 LOG.warn("Attempted to handle region transition for server but " +
917 "it is not online: " + prettyPrintedRegionName + ", " + rt);
918 return;
919 }
920
921 RegionState regionState =
922 regionStates.getRegionState(encodedName);
923 long startTime = System.currentTimeMillis();
924 if (LOG.isDebugEnabled()) {
925 boolean lateEvent = createTime < (startTime - 15000);
926 LOG.debug("Handling " + rt.getEventType() +
927 ", server=" + sn + ", region=" +
928 (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
929 (lateEvent ? ", which is more than 15 seconds late" : "") +
930 ", current_state=" + regionState);
931 }
932
933
934 if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
935 return;
936 }
937
938
939 Lock lock = locker.acquireLock(encodedName);
940 try {
941 RegionState latestState =
942 regionStates.getRegionState(encodedName);
943 if ((regionState == null && latestState != null)
944 || (regionState != null && latestState == null)
945 || (regionState != null && latestState != null
946 && latestState.getState() != regionState.getState())) {
947 LOG.warn("Region state changed from " + regionState + " to "
948 + latestState + ", while acquiring lock");
949 }
950 long waitedTime = System.currentTimeMillis() - startTime;
951 if (waitedTime > 5000) {
952 LOG.warn("Took " + waitedTime + "ms to acquire the lock");
953 }
954 regionState = latestState;
955 switch (rt.getEventType()) {
956 case RS_ZK_REQUEST_REGION_SPLIT:
957 case RS_ZK_REGION_SPLITTING:
958 case RS_ZK_REGION_SPLIT:
959 if (!handleRegionSplitting(
960 rt, encodedName, prettyPrintedRegionName, sn)) {
961 deleteSplittingNode(encodedName, sn);
962 }
963 break;
964
965 case RS_ZK_REQUEST_REGION_MERGE:
966 case RS_ZK_REGION_MERGING:
967 case RS_ZK_REGION_MERGED:
968
969
970 if (!handleRegionMerging(
971 rt, encodedName, prettyPrintedRegionName, sn)) {
972 deleteMergingNode(encodedName, sn);
973 }
974 break;
975
976 case M_ZK_REGION_CLOSING:
977
978
979 if (regionState == null
980 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
981 LOG.warn("Received CLOSING for " + prettyPrintedRegionName
982 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
983 + regionStates.getRegionState(encodedName));
984 return;
985 }
986
987 regionStates.updateRegionState(rt, State.CLOSING);
988 break;
989
990 case RS_ZK_REGION_CLOSED:
991
992 if (regionState == null
993 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
994 LOG.warn("Received CLOSED for " + prettyPrintedRegionName
995 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
996 + regionStates.getRegionState(encodedName));
997 return;
998 }
999
1000
1001
1002 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1003 updateClosedRegionHandlerTracker(regionState.getRegion());
1004 break;
1005
1006 case RS_ZK_REGION_FAILED_OPEN:
1007 if (regionState == null
1008 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1009 LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1010 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1011 + regionStates.getRegionState(encodedName));
1012 return;
1013 }
1014 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1015 if (failedOpenCount == null) {
1016 failedOpenCount = new AtomicInteger();
1017
1018
1019
1020 failedOpenTracker.put(encodedName, failedOpenCount);
1021 }
1022 if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1023 regionStates.updateRegionState(rt, State.FAILED_OPEN);
1024
1025
1026 failedOpenTracker.remove(encodedName);
1027 } else {
1028
1029 regionState = regionStates.updateRegionState(rt, State.CLOSED);
1030 if (regionState != null) {
1031
1032
1033 try {
1034 getRegionPlan(regionState.getRegion(), sn, true);
1035 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1036 } catch (HBaseIOException e) {
1037 LOG.warn("Failed to get region plan", e);
1038 }
1039 }
1040 }
1041 break;
1042
1043 case RS_ZK_REGION_OPENING:
1044
1045
1046 if (regionState == null
1047 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1048 LOG.warn("Received OPENING for " + prettyPrintedRegionName
1049 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1050 + regionStates.getRegionState(encodedName));
1051 return;
1052 }
1053
1054 regionStates.updateRegionState(rt, State.OPENING);
1055 break;
1056
1057 case RS_ZK_REGION_OPENED:
1058
1059 if (regionState == null
1060 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1061 LOG.warn("Received OPENED for " + prettyPrintedRegionName
1062 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1063 + regionStates.getRegionState(encodedName));
1064
1065 if (regionState != null) {
1066
1067
1068
1069 unassign(regionState.getRegion(), null, -1, null, false, sn);
1070 }
1071 return;
1072 }
1073
1074 regionState =
1075 regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1076 if (regionState != null) {
1077 failedOpenTracker.remove(encodedName);
1078 new OpenedRegionHandler(
1079 server, this, regionState.getRegion(), sn, expectedVersion).process();
1080 updateOpenedRegionHandlerTracker(regionState.getRegion());
1081 }
1082 break;
1083
1084 default:
1085 throw new IllegalStateException("Received event is not valid.");
1086 }
1087 } finally {
1088 lock.unlock();
1089 }
1090 }
1091
1092
1093 boolean wasClosedHandlerCalled(HRegionInfo hri) {
1094 AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1095
1096
1097
1098 return b == null ? false : b.compareAndSet(true, false);
1099 }
1100
1101
1102 boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1103 AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1104
1105
1106
1107 return b == null ? false : b.compareAndSet(true, false);
1108 }
1109
1110
1111 void initializeHandlerTrackers() {
1112 closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1113 openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1114 }
1115
1116 void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1117 if (closedRegionHandlerCalled != null) {
1118 closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1119 }
1120 }
1121
1122 void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1123 if (openedRegionHandlerCalled != null) {
1124 openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1125 }
1126 }
1127
1128
1129
1130
1131
1132
1133 void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1134 if (!shouldAssignRegionsWithFavoredNodes) return;
1135
1136
1137 Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1138 new HashMap<HRegionInfo, List<ServerName>>();
1139 for (HRegionInfo region : regions) {
1140 regionToFavoredNodes.put(region,
1141 ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1142 }
1143 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker);
1144 }
1145
1146
1147
1148
1149
1150
1151
1152 private void handleHBCK(RegionTransition rt) {
1153 String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1154 LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1155 ", server=" + rt.getServerName() + ", region=" +
1156 HRegionInfo.prettyPrint(encodedName));
1157 RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1158 switch (rt.getEventType()) {
1159 case M_ZK_REGION_OFFLINE:
1160 HRegionInfo regionInfo;
1161 if (regionState != null) {
1162 regionInfo = regionState.getRegion();
1163 } else {
1164 try {
1165 byte [] name = rt.getRegionName();
1166 Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(catalogTracker, name);
1167 regionInfo = p.getFirst();
1168 } catch (IOException e) {
1169 LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1170 return;
1171 }
1172 }
1173 LOG.info("HBCK repair is triggering assignment of region=" +
1174 regionInfo.getRegionNameAsString());
1175
1176 assign(regionInfo, false);
1177 break;
1178
1179 default:
1180 LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1181 break;
1182 }
1183
1184 }
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200 @Override
1201 public void nodeCreated(String path) {
1202 handleAssignmentEvent(path);
1203 }
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217 @Override
1218 public void nodeDataChanged(String path) {
1219 handleAssignmentEvent(path);
1220 }
1221
1222
1223
1224
1225
1226 private final Set<String> regionsInProgress = new HashSet<String>();
1227
1228
1229 private final LinkedHashMultimap <String, RegionRunnable>
1230 zkEventWorkerWaitingList = LinkedHashMultimap.create();
1231
1232
1233
1234
1235 private interface RegionRunnable extends Runnable{
1236
1237
1238
1239 String getRegionName();
1240 }
1241
1242
1243
1244
1245
1246 protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1247
1248 synchronized (regionsInProgress) {
1249
1250
1251 if (regionsInProgress.contains(regRunnable.getRegionName())) {
1252 synchronized (zkEventWorkerWaitingList){
1253 zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1254 }
1255 return;
1256 }
1257
1258
1259 regionsInProgress.add(regRunnable.getRegionName());
1260 zkEventWorkers.submit(new Runnable() {
1261 @Override
1262 public void run() {
1263 try {
1264 regRunnable.run();
1265 } finally {
1266
1267
1268 synchronized (regionsInProgress) {
1269 regionsInProgress.remove(regRunnable.getRegionName());
1270 synchronized (zkEventWorkerWaitingList) {
1271 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1272 regRunnable.getRegionName());
1273 if (!waiting.isEmpty()) {
1274
1275 RegionRunnable toSubmit = waiting.iterator().next();
1276 zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1277 zkEventWorkersSubmit(toSubmit);
1278 }
1279 }
1280 }
1281 }
1282 }
1283 });
1284 }
1285 }
1286
1287 @Override
1288 public void nodeDeleted(final String path) {
1289 if (path.startsWith(watcher.assignmentZNode)) {
1290 final String regionName = ZKAssign.getRegionName(watcher, path);
1291 zkEventWorkersSubmit(new RegionRunnable() {
1292 @Override
1293 public String getRegionName() {
1294 return regionName;
1295 }
1296
1297 @Override
1298 public void run() {
1299 Lock lock = locker.acquireLock(regionName);
1300 try {
1301 RegionState rs = regionStates.getRegionTransitionState(regionName);
1302 if (rs == null) {
1303 rs = regionStates.getRegionState(regionName);
1304 if (rs == null || !rs.isMergingNew()) {
1305
1306 return;
1307 }
1308 }
1309
1310 HRegionInfo regionInfo = rs.getRegion();
1311 String regionNameStr = regionInfo.getRegionNameAsString();
1312 LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1313 boolean disabled = getZKTable().isDisablingOrDisabledTable(regionInfo.getTable());
1314 ServerName serverName = rs.getServerName();
1315 if (serverManager.isServerOnline(serverName)) {
1316 if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1317 synchronized (regionStates) {
1318 regionOnline(regionInfo, serverName);
1319 if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1320
1321
1322 HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1323 HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1324 if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1325 LOG.warn("Split daughter region not in transition " + hri_a);
1326 }
1327 if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1328 LOG.warn("Split daughter region not in transition" + hri_b);
1329 }
1330 regionOffline(hri_a);
1331 regionOffline(hri_b);
1332 splitRegions.remove(regionInfo);
1333 }
1334 if (disabled) {
1335
1336 LOG.info("Opened " + regionNameStr
1337 + "but this table is disabled, triggering close of region");
1338 unassign(regionInfo);
1339 }
1340 }
1341 } else if (rs.isMergingNew()) {
1342 synchronized (regionStates) {
1343 String p = regionInfo.getEncodedName();
1344 PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1345 if (regions != null) {
1346 onlineMergingRegion(disabled, regions.getFirst(), serverName);
1347 onlineMergingRegion(disabled, regions.getSecond(), serverName);
1348 }
1349 }
1350 }
1351 }
1352 } finally {
1353 lock.unlock();
1354 }
1355 }
1356
1357 private void onlineMergingRegion(boolean disabled,
1358 final HRegionInfo hri, final ServerName serverName) {
1359 RegionState regionState = regionStates.getRegionState(hri);
1360 if (regionState != null && regionState.isMerging()
1361 && regionState.isOnServer(serverName)) {
1362 regionOnline(regionState.getRegion(), serverName);
1363 if (disabled) {
1364 unassign(hri);
1365 }
1366 }
1367 }
1368 });
1369 }
1370 }
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384 @Override
1385 public void nodeChildrenChanged(String path) {
1386 if (path.equals(watcher.assignmentZNode)) {
1387 zkEventWorkers.submit(new Runnable() {
1388 @Override
1389 public void run() {
1390 try {
1391
1392 List<String> children =
1393 ZKUtil.listChildrenAndWatchForNewChildren(
1394 watcher, watcher.assignmentZNode);
1395 if (children != null) {
1396 Stat stat = new Stat();
1397 for (String child : children) {
1398
1399
1400
1401 if (!regionStates.isRegionInTransition(child)) {
1402 ZKAssign.getDataAndWatch(watcher, child, stat);
1403 }
1404 }
1405 }
1406 } catch (KeeperException e) {
1407 server.abort("Unexpected ZK exception reading unassigned children", e);
1408 }
1409 }
1410 });
1411 }
1412 }
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423 void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1424 regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1425 }
1426
1427 void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1428 numRegionsOpened.incrementAndGet();
1429 regionStates.regionOnline(regionInfo, sn, openSeqNum);
1430
1431
1432 clearRegionPlan(regionInfo);
1433
1434 addToServersInUpdatingTimer(sn);
1435 balancer.regionOnline(regionInfo, sn);
1436
1437
1438 sendRegionOpenedNotification(regionInfo, sn);
1439 }
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449 private void handleAssignmentEvent(final String path) {
1450 if (path.startsWith(watcher.assignmentZNode)) {
1451 final String regionName = ZKAssign.getRegionName(watcher, path);
1452
1453 zkEventWorkersSubmit(new RegionRunnable() {
1454 @Override
1455 public String getRegionName() {
1456 return regionName;
1457 }
1458
1459 @Override
1460 public void run() {
1461 try {
1462 Stat stat = new Stat();
1463 byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1464 if (data == null) return;
1465
1466 RegionTransition rt = RegionTransition.parseFrom(data);
1467 handleRegion(rt, stat.getVersion());
1468 } catch (KeeperException e) {
1469 server.abort("Unexpected ZK exception reading unassigned node data", e);
1470 } catch (DeserializationException e) {
1471 server.abort("Unexpected exception deserializing node data", e);
1472 }
1473 }
1474 });
1475 }
1476 }
1477
1478
1479
1480
1481
1482
1483 private void addToServersInUpdatingTimer(final ServerName sn) {
1484 if (tomActivated){
1485 this.serversInUpdatingTimer.add(sn);
1486 }
1487 }
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502 private void updateTimers(final ServerName sn) {
1503 Preconditions.checkState(tomActivated);
1504 if (sn == null) return;
1505
1506
1507
1508
1509
1510 List<Map.Entry<String, RegionPlan>> rps;
1511 synchronized(this.regionPlans) {
1512 rps = new ArrayList<Map.Entry<String, RegionPlan>>(regionPlans.entrySet());
1513 }
1514
1515 for (Map.Entry<String, RegionPlan> e : rps) {
1516 if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
1517 RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
1518 if (regionState != null) {
1519 regionState.updateTimestampToNow();
1520 }
1521 }
1522 }
1523 }
1524
1525
1526
1527
1528
1529
1530
1531
1532 public void regionOffline(final HRegionInfo regionInfo) {
1533 regionOffline(regionInfo, null);
1534 }
1535
1536 public void offlineDisabledRegion(HRegionInfo regionInfo) {
1537 if (useZKForAssignment) {
1538
1539 LOG.debug("Table being disabled so deleting ZK node and removing from " +
1540 "regions in transition, skipping assignment of region " +
1541 regionInfo.getRegionNameAsString());
1542 String encodedName = regionInfo.getEncodedName();
1543 deleteNodeInStates(encodedName, "closed", null,
1544 EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1545 }
1546 regionOffline(regionInfo);
1547 }
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569 public void assign(HRegionInfo region, boolean setOfflineInZK) {
1570 assign(region, setOfflineInZK, false);
1571 }
1572
1573
1574
1575
1576 public void assign(HRegionInfo region,
1577 boolean setOfflineInZK, boolean forceNewPlan) {
1578 if (isDisabledorDisablingRegionInRIT(region)) {
1579 return;
1580 }
1581 if (this.serverManager.isClusterShutdown()) {
1582 LOG.info("Cluster shutdown is set; skipping assign of " +
1583 region.getRegionNameAsString());
1584 return;
1585 }
1586 String encodedName = region.getEncodedName();
1587 Lock lock = locker.acquireLock(encodedName);
1588 try {
1589 RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1590 if (state != null) {
1591 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1592 LOG.info("Skip assigning " + region.getRegionNameAsString()
1593 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1594 + " is dead but not processed yet");
1595 return;
1596 }
1597 assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1598 }
1599 } finally {
1600 lock.unlock();
1601 }
1602 }
1603
1604
1605
1606
1607
1608
1609
1610 boolean assign(final ServerName destination, final List<HRegionInfo> regions) {
1611 long startTime = EnvironmentEdgeManager.currentTimeMillis();
1612 try {
1613 int regionCount = regions.size();
1614 if (regionCount == 0) {
1615 return true;
1616 }
1617 LOG.debug("Assigning " + regionCount + " region(s) to " + destination.toString());
1618 Set<String> encodedNames = new HashSet<String>(regionCount);
1619 for (HRegionInfo region : regions) {
1620 encodedNames.add(region.getEncodedName());
1621 }
1622
1623 List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1624 Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1625 try {
1626 AtomicInteger counter = new AtomicInteger(0);
1627 Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1628 OfflineCallback cb = new OfflineCallback(
1629 watcher, destination, counter, offlineNodesVersions);
1630 Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1631 List<RegionState> states = new ArrayList<RegionState>(regions.size());
1632 for (HRegionInfo region : regions) {
1633 String encodedName = region.getEncodedName();
1634 if (!isDisabledorDisablingRegionInRIT(region)) {
1635 RegionState state = forceRegionStateToOffline(region, false);
1636 boolean onDeadServer = false;
1637 if (state != null) {
1638 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1639 LOG.info("Skip assigning " + region.getRegionNameAsString()
1640 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1641 + " is dead but not processed yet");
1642 onDeadServer = true;
1643 } else if (!useZKForAssignment
1644 || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1645 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1646 plans.put(encodedName, plan);
1647 states.add(state);
1648 continue;
1649 }
1650 }
1651
1652 if (!onDeadServer) {
1653 LOG.info("failed to force region state to offline or "
1654 + "failed to set it offline in ZK, will reassign later: " + region);
1655 failedToOpenRegions.add(region);
1656 }
1657 }
1658
1659
1660 Lock lock = locks.remove(encodedName);
1661 lock.unlock();
1662 }
1663 if (useZKForAssignment) {
1664
1665 int total = states.size();
1666 for (int oldCounter = 0; !server.isStopped();) {
1667 int count = counter.get();
1668 if (oldCounter != count) {
1669 LOG.info(destination.toString() + " unassigned znodes=" + count + " of total="
1670 + total);
1671 oldCounter = count;
1672 }
1673 if (count >= total) break;
1674 Threads.sleep(5);
1675 }
1676 }
1677
1678 if (server.isStopped()) {
1679 return false;
1680 }
1681
1682
1683
1684 this.addPlans(plans);
1685
1686 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1687 new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1688 for (RegionState state: states) {
1689 HRegionInfo region = state.getRegion();
1690 String encodedRegionName = region.getEncodedName();
1691 Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1692 if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1693 LOG.warn("failed to offline in zookeeper: " + region);
1694 failedToOpenRegions.add(region);
1695 Lock lock = locks.remove(encodedRegionName);
1696 lock.unlock();
1697 } else {
1698 regionStates.updateRegionState(
1699 region, State.PENDING_OPEN, destination);
1700 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1701 if (this.shouldAssignRegionsWithFavoredNodes) {
1702 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1703 }
1704 regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>(
1705 region, nodeVersion, favoredNodes));
1706 }
1707 }
1708
1709
1710 try {
1711
1712
1713 long maxWaitTime = System.currentTimeMillis() +
1714 this.server.getConfiguration().
1715 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1716 for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1717 try {
1718
1719 if (regionOpenInfos.isEmpty()) {
1720 break;
1721 }
1722 List<RegionOpeningState> regionOpeningStateList = serverManager
1723 .sendRegionOpen(destination, regionOpenInfos);
1724 if (regionOpeningStateList == null) {
1725
1726 return false;
1727 }
1728 for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1729 RegionOpeningState openingState = regionOpeningStateList.get(k);
1730 if (openingState != RegionOpeningState.OPENED) {
1731 HRegionInfo region = regionOpenInfos.get(k).getFirst();
1732 if (openingState == RegionOpeningState.ALREADY_OPENED) {
1733 processAlreadyOpenedRegion(region, destination);
1734 } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1735
1736 failedToOpenRegions.add(region);
1737 } else {
1738 LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1739 + openingState + " in assigning region " + region);
1740 }
1741 }
1742 }
1743 break;
1744 } catch (IOException e) {
1745 if (e instanceof RemoteException) {
1746 e = ((RemoteException)e).unwrapRemoteException();
1747 }
1748 if (e instanceof RegionServerStoppedException) {
1749 LOG.warn("The region server was shut down, ", e);
1750
1751 return false;
1752 } else if (e instanceof ServerNotRunningYetException) {
1753 long now = System.currentTimeMillis();
1754 if (now < maxWaitTime) {
1755 LOG.debug("Server is not yet up; waiting up to " +
1756 (maxWaitTime - now) + "ms", e);
1757 Thread.sleep(100);
1758 i--;
1759 continue;
1760 }
1761 } else if (e instanceof java.net.SocketTimeoutException
1762 && this.serverManager.isServerOnline(destination)) {
1763
1764
1765
1766
1767 if (LOG.isDebugEnabled()) {
1768 LOG.debug("Bulk assigner openRegion() to " + destination
1769 + " has timed out, but the regions might"
1770 + " already be opened on it.", e);
1771 }
1772
1773 Thread.sleep(100);
1774 i--;
1775 continue;
1776 }
1777 throw e;
1778 }
1779 }
1780 } catch (IOException e) {
1781
1782 LOG.info("Unable to communicate with " + destination
1783 + " in order to assign regions, ", e);
1784 return false;
1785 } catch (InterruptedException e) {
1786 throw new RuntimeException(e);
1787 }
1788 } finally {
1789 for (Lock lock : locks.values()) {
1790 lock.unlock();
1791 }
1792 }
1793
1794 if (!failedToOpenRegions.isEmpty()) {
1795 for (HRegionInfo region : failedToOpenRegions) {
1796 if (!regionStates.isRegionOnline(region)) {
1797 invokeAssign(region);
1798 }
1799 }
1800 }
1801 LOG.debug("Bulk assigning done for " + destination);
1802 return true;
1803 } finally {
1804 metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
1805 }
1806 }
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818 private void unassign(final HRegionInfo region,
1819 final RegionState state, final int versionOfClosingNode,
1820 final ServerName dest, final boolean transitionInZK,
1821 final ServerName src) {
1822 ServerName server = src;
1823 if (state != null) {
1824 server = state.getServerName();
1825 }
1826 long maxWaitTime = -1;
1827 for (int i = 1; i <= this.maximumAttempts; i++) {
1828 if (this.server.isStopped() || this.server.isAborted()) {
1829 LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1830 return;
1831 }
1832
1833 if (!serverManager.isServerOnline(server)) {
1834 LOG.debug("Offline " + region.getRegionNameAsString()
1835 + ", no need to unassign since it's on a dead server: " + server);
1836 if (transitionInZK) {
1837
1838 deleteClosingOrClosedNode(region, server);
1839 }
1840 if (state != null) {
1841 regionOffline(region);
1842 }
1843 return;
1844 }
1845 try {
1846
1847 if (serverManager.sendRegionClose(server, region,
1848 versionOfClosingNode, dest, transitionInZK)) {
1849 LOG.debug("Sent CLOSE to " + server + " for region " +
1850 region.getRegionNameAsString());
1851 if (useZKForAssignment && !transitionInZK && state != null) {
1852
1853
1854 unassign(region, state, versionOfClosingNode,
1855 dest, transitionInZK, src);
1856 }
1857 return;
1858 }
1859
1860
1861 LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1862 region.getRegionNameAsString());
1863 } catch (Throwable t) {
1864 if (t instanceof RemoteException) {
1865 t = ((RemoteException)t).unwrapRemoteException();
1866 }
1867 boolean logRetries = true;
1868 if (t instanceof NotServingRegionException
1869 || t instanceof RegionServerStoppedException
1870 || t instanceof ServerNotRunningYetException) {
1871 LOG.debug("Offline " + region.getRegionNameAsString()
1872 + ", it's not any more on " + server, t);
1873 if (transitionInZK) {
1874 deleteClosingOrClosedNode(region, server);
1875 }
1876 if (state != null) {
1877 regionOffline(region);
1878 }
1879 return;
1880 } else if ((t instanceof FailedServerException) || (state != null &&
1881 t instanceof RegionAlreadyInTransitionException)) {
1882 long sleepTime = 0;
1883 Configuration conf = this.server.getConfiguration();
1884 if(t instanceof FailedServerException) {
1885 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1886 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1887 } else {
1888
1889 LOG.debug("update " + state + " the timestamp.");
1890 state.updateTimestampToNow();
1891 if (maxWaitTime < 0) {
1892 maxWaitTime =
1893 EnvironmentEdgeManager.currentTimeMillis()
1894 + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1895 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1896 }
1897 long now = EnvironmentEdgeManager.currentTimeMillis();
1898 if (now < maxWaitTime) {
1899 LOG.debug("Region is already in transition; "
1900 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1901 sleepTime = 100;
1902 i--;
1903 logRetries = false;
1904 }
1905 }
1906 try {
1907 if (sleepTime > 0) {
1908 Thread.sleep(sleepTime);
1909 }
1910 } catch (InterruptedException ie) {
1911 LOG.warn("Failed to unassign "
1912 + region.getRegionNameAsString() + " since interrupted", ie);
1913 Thread.currentThread().interrupt();
1914 if (!tomActivated && state != null) {
1915 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1916 }
1917 return;
1918 }
1919 }
1920
1921 if (logRetries) {
1922 LOG.info("Server " + server + " returned " + t + " for "
1923 + region.getRegionNameAsString() + ", try=" + i
1924 + " of " + this.maximumAttempts, t);
1925
1926 }
1927 }
1928 }
1929
1930 if (!tomActivated && state != null) {
1931 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1932 }
1933 }
1934
1935
1936
1937
1938 private RegionState forceRegionStateToOffline(
1939 final HRegionInfo region, final boolean forceNewPlan) {
1940 RegionState state = regionStates.getRegionState(region);
1941 if (state == null) {
1942 LOG.warn("Assigning a region not in region states: " + region);
1943 state = regionStates.createRegionState(region);
1944 }
1945
1946 ServerName sn = state.getServerName();
1947 if (forceNewPlan && LOG.isDebugEnabled()) {
1948 LOG.debug("Force region state offline " + state);
1949 }
1950
1951 switch (state.getState()) {
1952 case OPEN:
1953 case OPENING:
1954 case PENDING_OPEN:
1955 case CLOSING:
1956 case PENDING_CLOSE:
1957 if (!forceNewPlan) {
1958 LOG.debug("Skip assigning " +
1959 region + ", it is already " + state);
1960 return null;
1961 }
1962 case FAILED_CLOSE:
1963 case FAILED_OPEN:
1964 unassign(region, state, -1, null, false, null);
1965 state = regionStates.getRegionState(region);
1966 if (state.isFailedClose()) {
1967
1968
1969 LOG.info("Skip assigning " +
1970 region + ", we couldn't close it: " + state);
1971 return null;
1972 }
1973 case OFFLINE:
1974
1975
1976
1977
1978
1979
1980
1981
1982 if (useZKForAssignment
1983 && regionStates.isServerDeadAndNotProcessed(sn)
1984 && wasRegionOnDeadServerByMeta(region, sn)) {
1985 if (!regionStates.isRegionInTransition(region)) {
1986 LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
1987 regionStates.updateRegionState(region, State.OFFLINE);
1988 }
1989 LOG.info("Skip assigning " + region.getRegionNameAsString()
1990 + ", it is on a dead but not processed yet server: " + sn);
1991 return null;
1992 }
1993 case CLOSED:
1994 break;
1995 default:
1996 LOG.error("Trying to assign region " + region
1997 + ", which is " + state);
1998 return null;
1999 }
2000 return state;
2001 }
2002
2003 private boolean wasRegionOnDeadServerByMeta(
2004 final HRegionInfo region, final ServerName sn) {
2005 try {
2006 if (region.isMetaRegion()) {
2007 ServerName server = catalogTracker.getMetaLocation();
2008 return regionStates.isServerDeadAndNotProcessed(server);
2009 }
2010 while (!server.isStopped()) {
2011 try {
2012 catalogTracker.waitForMeta();
2013 Result r = MetaReader.getRegionResult(catalogTracker, region.getRegionName());
2014 if (r == null || r.isEmpty()) return false;
2015 ServerName server = HRegionInfo.getServerName(r);
2016 return regionStates.isServerDeadAndNotProcessed(server);
2017 } catch (IOException ioe) {
2018 LOG.info("Received exception accessing hbase:meta during force assign "
2019 + region.getRegionNameAsString() + ", retrying", ioe);
2020 }
2021 }
2022 } catch (InterruptedException e) {
2023 Thread.currentThread().interrupt();
2024 LOG.info("Interrupted accessing hbase:meta", e);
2025 }
2026
2027 return regionStates.isServerDeadAndNotProcessed(sn);
2028 }
2029
2030
2031
2032
2033
2034
2035
2036 private void assign(RegionState state,
2037 final boolean setOfflineInZK, final boolean forceNewPlan) {
2038 long startTime = EnvironmentEdgeManager.currentTimeMillis();
2039 try {
2040 Configuration conf = server.getConfiguration();
2041 RegionState currentState = state;
2042 int versionOfOfflineNode = -1;
2043 RegionPlan plan = null;
2044 long maxWaitTime = -1;
2045 HRegionInfo region = state.getRegion();
2046 RegionOpeningState regionOpenState;
2047 Throwable previousException = null;
2048 for (int i = 1; i <= maximumAttempts; i++) {
2049 if (server.isStopped() || server.isAborted()) {
2050 LOG.info("Skip assigning " + region.getRegionNameAsString()
2051 + ", the server is stopped/aborted");
2052 return;
2053 }
2054
2055 if (plan == null) {
2056 try {
2057 plan = getRegionPlan(region, forceNewPlan);
2058 } catch (HBaseIOException e) {
2059 LOG.warn("Failed to get region plan", e);
2060 }
2061 }
2062
2063 if (plan == null) {
2064 LOG.warn("Unable to determine a plan to assign " + region);
2065 if (tomActivated){
2066 this.timeoutMonitor.setAllRegionServersOffline(true);
2067 } else {
2068 if (region.isMetaRegion()) {
2069 if (i == maximumAttempts) {
2070 i = 0;
2071
2072 LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2073 " after maximumAttempts (" + this.maximumAttempts +
2074 "). Reset attempts count and continue retrying.");
2075 }
2076 waitForRetryingMetaAssignment();
2077 continue;
2078 }
2079
2080 regionStates.updateRegionState(region, State.FAILED_OPEN);
2081 }
2082 return;
2083 }
2084 if (setOfflineInZK && versionOfOfflineNode == -1) {
2085
2086
2087 versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2088 if (versionOfOfflineNode != -1) {
2089 if (isDisabledorDisablingRegionInRIT(region)) {
2090 return;
2091 }
2092
2093
2094
2095
2096
2097
2098 TableName tableName = region.getTable();
2099 if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
2100 LOG.debug("Setting table " + tableName + " to ENABLED state.");
2101 setEnabledTable(tableName);
2102 }
2103 }
2104 }
2105 if (setOfflineInZK && versionOfOfflineNode == -1) {
2106 LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2107
2108
2109
2110
2111 if (!server.isAborted()) {
2112 continue;
2113 }
2114 }
2115 LOG.info("Assigning " + region.getRegionNameAsString() +
2116 " to " + plan.getDestination().toString());
2117
2118 currentState = regionStates.updateRegionState(region,
2119 State.PENDING_OPEN, plan.getDestination());
2120
2121 boolean needNewPlan;
2122 final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2123 " to " + plan.getDestination();
2124 try {
2125 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2126 if (this.shouldAssignRegionsWithFavoredNodes) {
2127 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2128 }
2129 regionOpenState = serverManager.sendRegionOpen(
2130 plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2131
2132 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2133
2134 needNewPlan = true;
2135 LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2136 " trying to assign elsewhere instead; " +
2137 "try=" + i + " of " + this.maximumAttempts);
2138 } else {
2139
2140 if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2141 processAlreadyOpenedRegion(region, plan.getDestination());
2142 }
2143 return;
2144 }
2145
2146 } catch (Throwable t) {
2147 if (t instanceof RemoteException) {
2148 t = ((RemoteException) t).unwrapRemoteException();
2149 }
2150 previousException = t;
2151
2152
2153
2154
2155 boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2156 t instanceof ServerNotRunningYetException);
2157
2158
2159
2160
2161
2162
2163 boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2164 && this.serverManager.isServerOnline(plan.getDestination()));
2165
2166 if (hold) {
2167 LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2168 "try=" + i + " of " + this.maximumAttempts, t);
2169
2170 if (maxWaitTime < 0) {
2171 if (t instanceof RegionAlreadyInTransitionException) {
2172 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
2173 + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2174 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2175 } else {
2176 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
2177 + this.server.getConfiguration().getLong(
2178 "hbase.regionserver.rpc.startup.waittime", 60000);
2179 }
2180 }
2181 try {
2182 needNewPlan = false;
2183 long now = EnvironmentEdgeManager.currentTimeMillis();
2184 if (now < maxWaitTime) {
2185 LOG.debug("Server is not yet up or region is already in transition; "
2186 + "waiting up to " + (maxWaitTime - now) + "ms", t);
2187 Thread.sleep(100);
2188 i--;
2189 } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2190 LOG.debug("Server is not up for a while; try a new one", t);
2191 needNewPlan = true;
2192 }
2193 } catch (InterruptedException ie) {
2194 LOG.warn("Failed to assign "
2195 + region.getRegionNameAsString() + " since interrupted", ie);
2196 Thread.currentThread().interrupt();
2197 if (!tomActivated) {
2198 regionStates.updateRegionState(region, State.FAILED_OPEN);
2199 }
2200 return;
2201 }
2202 } else if (retry) {
2203 needNewPlan = false;
2204 i--;
2205 LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2206 } else {
2207 needNewPlan = true;
2208 LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2209 " try=" + i + " of " + this.maximumAttempts, t);
2210 }
2211 }
2212
2213 if (i == this.maximumAttempts) {
2214
2215 if (region.isMetaRegion()) {
2216 i = 0;
2217 LOG.warn(assignMsg +
2218 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2219 this.maximumAttempts + "). Reset attempt counts and continue retrying.");
2220 waitForRetryingMetaAssignment();
2221 }
2222 else {
2223
2224
2225 continue;
2226 }
2227 }
2228
2229
2230
2231
2232 if (needNewPlan) {
2233
2234
2235
2236
2237 RegionPlan newPlan = null;
2238 try {
2239 newPlan = getRegionPlan(region, true);
2240 } catch (HBaseIOException e) {
2241 LOG.warn("Failed to get region plan", e);
2242 }
2243 if (newPlan == null) {
2244 if (tomActivated) {
2245 this.timeoutMonitor.setAllRegionServersOffline(true);
2246 } else {
2247 regionStates.updateRegionState(region, State.FAILED_OPEN);
2248 }
2249 LOG.warn("Unable to find a viable location to assign region " +
2250 region.getRegionNameAsString());
2251 return;
2252 }
2253
2254 if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2255
2256
2257
2258 currentState = regionStates.updateRegionState(region, State.OFFLINE);
2259 versionOfOfflineNode = -1;
2260 plan = newPlan;
2261 } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2262 previousException instanceof FailedServerException) {
2263 try {
2264 LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2265 " to the same failed server.");
2266 Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2267 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2268 } catch (InterruptedException ie) {
2269 LOG.warn("Failed to assign "
2270 + region.getRegionNameAsString() + " since interrupted", ie);
2271 Thread.currentThread().interrupt();
2272 if (!tomActivated) {
2273 regionStates.updateRegionState(region, State.FAILED_OPEN);
2274 }
2275 return;
2276 }
2277 }
2278 }
2279 }
2280
2281 if (!tomActivated) {
2282 regionStates.updateRegionState(region, State.FAILED_OPEN);
2283 }
2284 } finally {
2285 metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
2286 }
2287 }
2288
2289 private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2290
2291
2292
2293 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2294 + " to " + sn);
2295 String encodedName = region.getEncodedName();
2296 deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2297 regionStates.regionOnline(region, sn);
2298 }
2299
2300 private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2301 TableName tableName = region.getTable();
2302 boolean disabled = this.zkTable.isDisabledTable(tableName);
2303 if (disabled || this.zkTable.isDisablingTable(tableName)) {
2304 LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
2305 " skipping assign of " + region.getRegionNameAsString());
2306 offlineDisabledRegion(region);
2307 return true;
2308 }
2309 return false;
2310 }
2311
2312
2313
2314
2315 private void waitForRetryingMetaAssignment() {
2316 try {
2317 Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2318 } catch (InterruptedException e) {
2319 LOG.error("Got exception while waiting for hbase:meta assignment");
2320 Thread.currentThread().interrupt();
2321 }
2322 }
2323
2324
2325
2326
2327
2328
2329
2330
2331 private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2332 if (!state.isClosed() && !state.isOffline()) {
2333 String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2334 this.server.abort(msg, new IllegalStateException(msg));
2335 return -1;
2336 }
2337 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2338 int versionOfOfflineNode;
2339 try {
2340
2341 versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2342 state.getRegion(), destination);
2343 if (versionOfOfflineNode == -1) {
2344 LOG.warn("Attempted to create/force node into OFFLINE state before "
2345 + "completing assignment but failed to do so for " + state);
2346 return -1;
2347 }
2348 } catch (KeeperException e) {
2349 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2350 return -1;
2351 }
2352 return versionOfOfflineNode;
2353 }
2354
2355
2356
2357
2358
2359
2360 private RegionPlan getRegionPlan(final HRegionInfo region,
2361 final boolean forceNewPlan) throws HBaseIOException {
2362 return getRegionPlan(region, null, forceNewPlan);
2363 }
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374 private RegionPlan getRegionPlan(final HRegionInfo region,
2375 final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2376
2377 final String encodedName = region.getEncodedName();
2378 final List<ServerName> destServers =
2379 serverManager.createDestinationServersList(serverToExclude);
2380
2381 if (destServers.isEmpty()){
2382 LOG.warn("Can't move " + encodedName +
2383 ", there is no destination server available.");
2384 return null;
2385 }
2386
2387 RegionPlan randomPlan = null;
2388 boolean newPlan = false;
2389 RegionPlan existingPlan;
2390
2391 synchronized (this.regionPlans) {
2392 existingPlan = this.regionPlans.get(encodedName);
2393
2394 if (existingPlan != null && existingPlan.getDestination() != null) {
2395 LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2396 + " destination server is " + existingPlan.getDestination() +
2397 " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2398 }
2399
2400 if (forceNewPlan
2401 || existingPlan == null
2402 || existingPlan.getDestination() == null
2403 || !destServers.contains(existingPlan.getDestination())) {
2404 newPlan = true;
2405 randomPlan = new RegionPlan(region, null,
2406 balancer.randomAssignment(region, destServers));
2407 if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2408 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2409 regions.add(region);
2410 try {
2411 processFavoredNodes(regions);
2412 } catch (IOException ie) {
2413 LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2414 }
2415 }
2416 this.regionPlans.put(encodedName, randomPlan);
2417 }
2418 }
2419
2420 if (newPlan) {
2421 if (randomPlan.getDestination() == null) {
2422 LOG.warn("Can't find a destination for " + encodedName);
2423 return null;
2424 }
2425 LOG.debug("No previous transition plan found (or ignoring " +
2426 "an existing plan) for " + region.getRegionNameAsString() +
2427 "; generated random plan=" + randomPlan + "; " +
2428 serverManager.countOfRegionServers() +
2429 " (online=" + serverManager.getOnlineServers().size() +
2430 ", available=" + destServers.size() + ") available servers" +
2431 ", forceNewPlan=" + forceNewPlan);
2432 return randomPlan;
2433 }
2434 LOG.debug("Using pre-existing plan for " +
2435 region.getRegionNameAsString() + "; plan=" + existingPlan);
2436 return existingPlan;
2437 }
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452 public void unassign(HRegionInfo region) {
2453 unassign(region, false);
2454 }
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471 public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2472
2473 LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2474 + " (offlining), current state: " + regionStates.getRegionState(region));
2475
2476 String encodedName = region.getEncodedName();
2477
2478 int versionOfClosingNode = -1;
2479
2480
2481 ReentrantLock lock = locker.acquireLock(encodedName);
2482 RegionState state = regionStates.getRegionTransitionState(encodedName);
2483 boolean reassign = true;
2484 try {
2485 if (state == null) {
2486
2487
2488 state = regionStates.getRegionState(encodedName);
2489 if (state != null && state.isUnassignable()) {
2490 LOG.info("Attempting to unassign " + state + ", ignored");
2491
2492 return;
2493 }
2494
2495 try {
2496 if (state == null || state.getServerName() == null) {
2497
2498
2499 LOG.warn("Attempting to unassign a region not in RegionStates"
2500 + region.getRegionNameAsString() + ", offlined");
2501 regionOffline(region);
2502 return;
2503 }
2504 if (useZKForAssignment) {
2505 versionOfClosingNode = ZKAssign.createNodeClosing(
2506 watcher, region, state.getServerName());
2507 if (versionOfClosingNode == -1) {
2508 LOG.info("Attempting to unassign " +
2509 region.getRegionNameAsString() + " but ZK closing node "
2510 + "can't be created.");
2511 reassign = false;
2512 return;
2513 }
2514 }
2515 } catch (KeeperException e) {
2516 if (e instanceof NodeExistsException) {
2517
2518
2519
2520
2521 NodeExistsException nee = (NodeExistsException)e;
2522 String path = nee.getPath();
2523 try {
2524 if (isSplitOrSplittingOrMergedOrMerging(path)) {
2525 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2526 "skipping unassign because region no longer exists -- its split or merge");
2527 reassign = false;
2528 return;
2529 }
2530 } catch (KeeperException.NoNodeException ke) {
2531 LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2532 "; presuming split and that the region to unassign, " +
2533 encodedName + ", no longer exists -- confirm", ke);
2534 return;
2535 } catch (KeeperException ke) {
2536 LOG.error("Unexpected zk state", ke);
2537 } catch (DeserializationException de) {
2538 LOG.error("Failed parse", de);
2539 }
2540 }
2541
2542 server.abort("Unexpected ZK exception creating node CLOSING", e);
2543 reassign = false;
2544 return;
2545 }
2546 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2547 } else if (state.isFailedOpen()) {
2548
2549 regionOffline(region);
2550 return;
2551 } else if (force && state.isPendingCloseOrClosing()) {
2552 LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2553 " which is already " + state.getState() +
2554 " but forcing to send a CLOSE RPC again ");
2555 if (state.isFailedClose()) {
2556 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2557 }
2558 state.updateTimestampToNow();
2559 } else {
2560 LOG.debug("Attempting to unassign " +
2561 region.getRegionNameAsString() + " but it is " +
2562 "already in transition (" + state.getState() + ", force=" + force + ")");
2563 return;
2564 }
2565
2566 unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2567 } finally {
2568 lock.unlock();
2569
2570
2571 if (reassign && regionStates.isRegionOffline(region)) {
2572 assign(region, true);
2573 }
2574 }
2575 }
2576
2577 public void unassign(HRegionInfo region, boolean force){
2578 unassign(region, force, null);
2579 }
2580
2581
2582
2583
2584 public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2585 String encodedName = region.getEncodedName();
2586 deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2587 EventType.RS_ZK_REGION_CLOSED);
2588 }
2589
2590
2591
2592
2593
2594
2595
2596 private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2597 throws KeeperException, DeserializationException {
2598 boolean result = false;
2599
2600
2601 byte [] data = ZKAssign.getData(watcher, path);
2602 if (data == null) {
2603 LOG.info("Node " + path + " is gone");
2604 return false;
2605 }
2606 RegionTransition rt = RegionTransition.parseFrom(data);
2607 switch (rt.getEventType()) {
2608 case RS_ZK_REQUEST_REGION_SPLIT:
2609 case RS_ZK_REGION_SPLIT:
2610 case RS_ZK_REGION_SPLITTING:
2611 case RS_ZK_REQUEST_REGION_MERGE:
2612 case RS_ZK_REGION_MERGED:
2613 case RS_ZK_REGION_MERGING:
2614 result = true;
2615 break;
2616 default:
2617 LOG.info("Node " + path + " is in " + rt.getEventType());
2618 break;
2619 }
2620 return result;
2621 }
2622
2623
2624
2625
2626
2627
2628 public int getNumRegionsOpened() {
2629 return numRegionsOpened.get();
2630 }
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640 public boolean waitForAssignment(HRegionInfo regionInfo)
2641 throws InterruptedException {
2642 while (!regionStates.isRegionOnline(regionInfo)) {
2643 if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
2644 || this.server.isStopped()) {
2645 return false;
2646 }
2647
2648
2649
2650
2651 regionStates.waitForUpdate(100);
2652 }
2653 return true;
2654 }
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666 public void assignMeta() throws KeeperException {
2667 MetaRegionTracker.deleteMetaLocation(this.watcher);
2668 assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2669 }
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679 public void assign(Map<HRegionInfo, ServerName> regions)
2680 throws IOException, InterruptedException {
2681 if (regions == null || regions.isEmpty()) {
2682 return;
2683 }
2684 List<ServerName> servers = serverManager.createDestinationServersList();
2685 if (servers == null || servers.isEmpty()) {
2686 throw new IOException("Found no destination server to assign region(s)");
2687 }
2688
2689
2690 Map<ServerName, List<HRegionInfo>> bulkPlan =
2691 balancer.retainAssignment(regions, servers);
2692
2693 assign(regions.size(), servers.size(),
2694 "retainAssignment=true", bulkPlan);
2695 }
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705 public void assign(List<HRegionInfo> regions)
2706 throws IOException, InterruptedException {
2707 if (regions == null || regions.isEmpty()) {
2708 return;
2709 }
2710
2711 List<ServerName> servers = serverManager.createDestinationServersList();
2712 if (servers == null || servers.isEmpty()) {
2713 throw new IOException("Found no destination server to assign region(s)");
2714 }
2715
2716
2717 Map<ServerName, List<HRegionInfo>> bulkPlan
2718 = balancer.roundRobinAssignment(regions, servers);
2719 processFavoredNodes(regions);
2720
2721 assign(regions.size(), servers.size(),
2722 "round-robin=true", bulkPlan);
2723 }
2724
2725 private void assign(int regions, int totalServers,
2726 String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2727 throws InterruptedException, IOException {
2728
2729 int servers = bulkPlan.size();
2730 if (servers == 1 || (regions < bulkAssignThresholdRegions
2731 && servers < bulkAssignThresholdServers)) {
2732
2733
2734
2735 if (LOG.isTraceEnabled()) {
2736 LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2737 " region(s) to " + servers + " server(s)");
2738 }
2739 for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2740 if (!assign(plan.getKey(), plan.getValue())) {
2741 for (HRegionInfo region: plan.getValue()) {
2742 if (!regionStates.isRegionOnline(region)) {
2743 invokeAssign(region);
2744 }
2745 }
2746 }
2747 }
2748 } else {
2749 LOG.info("Bulk assigning " + regions + " region(s) across "
2750 + totalServers + " server(s), " + message);
2751
2752
2753 BulkAssigner ba = new GeneralBulkAssigner(
2754 this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2755 ba.bulkAssign();
2756 LOG.info("Bulk assigning done");
2757 }
2758 }
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770 private void assignAllUserRegions(Set<TableName> disabledOrDisablingOrEnabling)
2771 throws IOException, InterruptedException, KeeperException {
2772
2773
2774
2775
2776 Map<HRegionInfo, ServerName> allRegions;
2777 SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
2778 new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
2779 snapshotOfRegionAssignment.initialize();
2780 allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
2781 if (allRegions == null || allRegions.isEmpty()) {
2782 return;
2783 }
2784
2785
2786 boolean retainAssignment = server.getConfiguration().
2787 getBoolean("hbase.master.startup.retainassign", true);
2788
2789 if (retainAssignment) {
2790 assign(allRegions);
2791 } else {
2792 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
2793 assign(regions);
2794 }
2795
2796 for (HRegionInfo hri : allRegions.keySet()) {
2797 TableName tableName = hri.getTable();
2798 if (!zkTable.isEnabledTable(tableName)) {
2799 setEnabledTable(tableName);
2800 }
2801 }
2802 }
2803
2804
2805
2806
2807
2808
2809
2810 boolean waitUntilNoRegionsInTransition(final long timeout)
2811 throws InterruptedException {
2812
2813
2814
2815
2816
2817
2818 final long endTime = System.currentTimeMillis() + timeout;
2819
2820 while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2821 && endTime > System.currentTimeMillis()) {
2822 regionStates.waitForUpdate(100);
2823 }
2824
2825 return !regionStates.isRegionsInTransition();
2826 }
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837 Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
2838 Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2839 Set<TableName> disabledTables = ZKTable.getDisabledTables(watcher);
2840 Set<TableName> disabledOrEnablingTables = new HashSet<TableName>(disabledTables);
2841 disabledOrEnablingTables.addAll(enablingTables);
2842 Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
2843 disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
2844
2845
2846 List<Result> results = MetaReader.fullScan(this.catalogTracker);
2847
2848 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2849
2850 Map<ServerName, List<HRegionInfo>> offlineServers =
2851 new TreeMap<ServerName, List<HRegionInfo>>();
2852
2853 for (Result result : results) {
2854 HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(result);
2855 if (regionInfo == null) continue;
2856 State state = RegionStateStore.getRegionState(result);
2857 ServerName regionLocation = RegionStateStore.getRegionServer(result);
2858 if (disabledTables.contains(regionInfo.getTable())) {
2859
2860 regionLocation = null;
2861 }
2862 regionStates.createRegionState(regionInfo, state, regionLocation);
2863 if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
2864
2865 continue;
2866 }
2867 TableName tableName = regionInfo.getTable();
2868 if (!onlineServers.contains(regionLocation)) {
2869
2870 List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
2871 if (offlineRegions == null) {
2872 offlineRegions = new ArrayList<HRegionInfo>(1);
2873 offlineServers.put(regionLocation, offlineRegions);
2874 }
2875 if (useZKForAssignment) {
2876 regionStates.regionOffline(regionInfo);
2877 }
2878 offlineRegions.add(regionInfo);
2879 } else if (!disabledOrEnablingTables.contains(tableName)) {
2880
2881
2882
2883 regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation);
2884 regionStates.regionOnline(regionInfo, regionLocation);
2885 balancer.regionOnline(regionInfo, regionLocation);
2886 } else if (useZKForAssignment) {
2887 regionStates.regionOffline(regionInfo);
2888 }
2889
2890
2891 if (!disabledOrDisablingOrEnabling.contains(tableName)
2892 && !getZKTable().isEnabledTable(tableName)) {
2893 setEnabledTable(tableName);
2894 }
2895
2896 }
2897 return offlineServers;
2898 }
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908 private void recoverTableInDisablingState()
2909 throws KeeperException, TableNotFoundException, IOException {
2910 Set<TableName> disablingTables = ZKTable.getDisablingTables(watcher);
2911 if (disablingTables.size() != 0) {
2912 for (TableName tableName : disablingTables) {
2913
2914 LOG.info("The table " + tableName
2915 + " is in DISABLING state. Hence recovering by moving the table"
2916 + " to DISABLED state.");
2917 new DisableTableHandler(this.server, tableName, catalogTracker,
2918 this, tableLockManager, true).prepare().process();
2919 }
2920 }
2921 }
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931 private void recoverTableInEnablingState()
2932 throws KeeperException, TableNotFoundException, IOException {
2933 Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2934 if (enablingTables.size() != 0) {
2935 for (TableName tableName : enablingTables) {
2936
2937 LOG.info("The table " + tableName
2938 + " is in ENABLING state. Hence recovering by moving the table"
2939 + " to ENABLED state.");
2940
2941
2942 EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
2943 catalogTracker, this, tableLockManager, true);
2944 try {
2945 eth.prepare();
2946 } catch (TableNotFoundException e) {
2947 LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
2948 continue;
2949 }
2950 eth.process();
2951 }
2952 }
2953 }
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970 private void processDeadServersAndRecoverLostRegions(
2971 Map<ServerName, List<HRegionInfo>> deadServers)
2972 throws IOException, KeeperException {
2973 if (deadServers != null) {
2974 for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
2975 ServerName serverName = server.getKey();
2976
2977 regionStates.setLastRegionServerOfRegions(serverName, server.getValue());
2978 if (!serverManager.isServerDead(serverName)) {
2979 serverManager.expireServer(serverName);
2980 }
2981 }
2982 }
2983
2984 List<String> nodes = useZKForAssignment ?
2985 ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
2986 : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
2987 if (nodes != null && !nodes.isEmpty()) {
2988 for (String encodedRegionName : nodes) {
2989 processRegionInTransition(encodedRegionName, null);
2990 }
2991 } else if (!useZKForAssignment) {
2992 processRegionInTransitionZkLess();
2993 }
2994 }
2995
2996 void processRegionInTransitionZkLess() {
2997
2998
2999
3000
3001
3002 Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3003 for (RegionState regionState : rits.values()) {
3004 LOG.info("Processing " + regionState);
3005 ServerName serverName = regionState.getServerName();
3006
3007
3008 if (serverName != null
3009 && !serverManager.getOnlineServers().containsKey(serverName)) {
3010 LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3011 continue;
3012 }
3013 HRegionInfo regionInfo = regionState.getRegion();
3014 State state = regionState.getState();
3015
3016 switch (state) {
3017 case CLOSED:
3018 invokeAssign(regionInfo);
3019 break;
3020 case PENDING_OPEN:
3021 retrySendRegionOpen(regionState);
3022 break;
3023 case PENDING_CLOSE:
3024 retrySendRegionClose(regionState);
3025 break;
3026 case FAILED_CLOSE:
3027 case FAILED_OPEN:
3028 invokeUnassign(regionInfo);
3029 break;
3030 default:
3031
3032 }
3033 }
3034 }
3035
3036
3037
3038
3039
3040 private void retrySendRegionOpen(final RegionState regionState) {
3041 this.executorService.submit(
3042 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3043 @Override
3044 public void process() throws IOException {
3045 HRegionInfo hri = regionState.getRegion();
3046 ServerName serverName = regionState.getServerName();
3047 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3048 try {
3049 while (serverManager.isServerOnline(serverName)
3050 && !server.isStopped() && !server.isAborted()) {
3051 try {
3052 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3053 if (shouldAssignRegionsWithFavoredNodes) {
3054 favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3055 }
3056 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3057 serverName, hri, -1, favoredNodes);
3058
3059 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3060
3061
3062 LOG.debug("Got failed_opening in retry sendRegionOpen for "
3063 + regionState + ", re-assign it");
3064 invokeAssign(hri, true);
3065 }
3066 return;
3067 } catch (Throwable t) {
3068 if (t instanceof RemoteException) {
3069 t = ((RemoteException) t).unwrapRemoteException();
3070 }
3071
3072 if (t instanceof java.net.SocketTimeoutException
3073 || t instanceof FailedServerException) {
3074 Threads.sleep(100);
3075 continue;
3076 }
3077
3078 LOG.debug("Got exception in retry sendRegionOpen for "
3079 + regionState + ", re-assign it", t);
3080 invokeAssign(hri);
3081 return;
3082 }
3083 }
3084 } finally {
3085 lock.unlock();
3086 }
3087 }
3088 });
3089 }
3090
3091
3092
3093
3094
3095 private void retrySendRegionClose(final RegionState regionState) {
3096 this.executorService.submit(
3097 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3098 @Override
3099 public void process() throws IOException {
3100 HRegionInfo hri = regionState.getRegion();
3101 ServerName serverName = regionState.getServerName();
3102 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3103 try {
3104 while (serverManager.isServerOnline(serverName)
3105 && !server.isStopped() && !server.isAborted()) {
3106 try {
3107 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3108
3109 LOG.debug("Got false in retry sendRegionClose for "
3110 + regionState + ", re-close it");
3111 invokeUnAssign(hri);
3112 }
3113 return;
3114 } catch (Throwable t) {
3115 if (t instanceof RemoteException) {
3116 t = ((RemoteException) t).unwrapRemoteException();
3117 }
3118
3119 if (t instanceof java.net.SocketTimeoutException
3120 || t instanceof FailedServerException) {
3121 Threads.sleep(100);
3122 continue;
3123 }
3124 if (!(t instanceof NotServingRegionException
3125 || t instanceof RegionAlreadyInTransitionException)) {
3126
3127
3128
3129 LOG.debug("Got exception in retry sendRegionClose for "
3130 + regionState + ", re-close it", t);
3131 invokeUnAssign(hri);
3132 }
3133 return;
3134 }
3135 }
3136 } finally {
3137 lock.unlock();
3138 }
3139 }
3140 });
3141 }
3142
3143
3144
3145
3146
3147
3148
3149
3150 public void updateRegionsInTransitionMetrics() {
3151 long currentTime = System.currentTimeMillis();
3152 int totalRITs = 0;
3153 int totalRITsOverThreshold = 0;
3154 long oldestRITTime = 0;
3155 int ritThreshold = this.server.getConfiguration().
3156 getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3157 for (RegionState state: regionStates.getRegionsInTransition().values()) {
3158 totalRITs++;
3159 long ritTime = currentTime - state.getStamp();
3160 if (ritTime > ritThreshold) {
3161 totalRITsOverThreshold++;
3162 }
3163 if (oldestRITTime < ritTime) {
3164 oldestRITTime = ritTime;
3165 }
3166 }
3167 if (this.metricsAssignmentManager != null) {
3168 this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3169 this.metricsAssignmentManager.updateRITCount(totalRITs);
3170 this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3171 }
3172 }
3173
3174
3175
3176
3177 void clearRegionPlan(final HRegionInfo region) {
3178 synchronized (this.regionPlans) {
3179 this.regionPlans.remove(region.getEncodedName());
3180 }
3181 }
3182
3183
3184
3185
3186
3187
3188 public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3189 throws IOException, InterruptedException {
3190 waitOnRegionToClearRegionsInTransition(hri, -1L);
3191 }
3192
3193
3194
3195
3196
3197
3198
3199
3200 public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3201 throws InterruptedException {
3202 if (!regionStates.isRegionInTransition(hri)) return true;
3203 long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
3204 + timeOut;
3205
3206
3207 LOG.info("Waiting for " + hri.getEncodedName() +
3208 " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3209 while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3210 regionStates.waitForUpdate(100);
3211 if (EnvironmentEdgeManager.currentTimeMillis() > end) {
3212 LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3213 return false;
3214 }
3215 }
3216 if (this.server.isStopped()) {
3217 LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3218 return false;
3219 }
3220 return true;
3221 }
3222
3223
3224
3225
3226
3227 public class TimerUpdater extends Chore {
3228
3229 public TimerUpdater(final int period, final Stoppable stopper) {
3230 super("AssignmentTimerUpdater", period, stopper);
3231 }
3232
3233 @Override
3234 protected void chore() {
3235 Preconditions.checkState(tomActivated);
3236 ServerName serverToUpdateTimer = null;
3237 while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
3238 if (serverToUpdateTimer == null) {
3239 serverToUpdateTimer = serversInUpdatingTimer.first();
3240 } else {
3241 serverToUpdateTimer = serversInUpdatingTimer
3242 .higher(serverToUpdateTimer);
3243 }
3244 if (serverToUpdateTimer == null) {
3245 break;
3246 }
3247 updateTimers(serverToUpdateTimer);
3248 serversInUpdatingTimer.remove(serverToUpdateTimer);
3249 }
3250 }
3251 }
3252
3253
3254
3255
3256 public class TimeoutMonitor extends Chore {
3257 private boolean allRegionServersOffline = false;
3258 private ServerManager serverManager;
3259 private final int timeout;
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270 public TimeoutMonitor(final int period, final Stoppable stopper,
3271 ServerManager serverManager,
3272 final int timeout) {
3273 super("AssignmentTimeoutMonitor", period, stopper);
3274 this.timeout = timeout;
3275 this.serverManager = serverManager;
3276 }
3277
3278 private synchronized void setAllRegionServersOffline(
3279 boolean allRegionServersOffline) {
3280 this.allRegionServersOffline = allRegionServersOffline;
3281 }
3282
3283 @Override
3284 protected void chore() {
3285 Preconditions.checkState(tomActivated);
3286 boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
3287
3288
3289 long now = System.currentTimeMillis();
3290
3291
3292 for (String regionName : regionStates.getRegionsInTransition().keySet()) {
3293 RegionState regionState = regionStates.getRegionTransitionState(regionName);
3294 if (regionState == null) continue;
3295
3296 if (regionState.getStamp() + timeout <= now) {
3297
3298 actOnTimeOut(regionState);
3299 } else if (this.allRegionServersOffline && !noRSAvailable) {
3300 RegionPlan existingPlan = regionPlans.get(regionName);
3301 if (existingPlan == null
3302 || !this.serverManager.isServerOnline(existingPlan
3303 .getDestination())) {
3304
3305
3306 actOnTimeOut(regionState);
3307 }
3308 }
3309 }
3310 setAllRegionServersOffline(noRSAvailable);
3311 }
3312
3313 private void actOnTimeOut(RegionState regionState) {
3314 HRegionInfo regionInfo = regionState.getRegion();
3315 LOG.info("Regions in transition timed out: " + regionState);
3316
3317 switch (regionState.getState()) {
3318 case CLOSED:
3319 LOG.info("Region " + regionInfo.getEncodedName()
3320 + " has been CLOSED for too long, waiting on queued "
3321 + "ClosedRegionHandler to run or server shutdown");
3322
3323 regionState.updateTimestampToNow();
3324 break;
3325 case OFFLINE:
3326 LOG.info("Region has been OFFLINE for too long, " + "reassigning "
3327 + regionInfo.getRegionNameAsString() + " to a random server");
3328 invokeAssign(regionInfo);
3329 break;
3330 case PENDING_OPEN:
3331 LOG.info("Region has been PENDING_OPEN for too "
3332 + "long, reassigning region=" + regionInfo.getRegionNameAsString());
3333 invokeAssign(regionInfo);
3334 break;
3335 case OPENING:
3336 processOpeningState(regionInfo);
3337 break;
3338 case OPEN:
3339 LOG.error("Region has been OPEN for too long, " +
3340 "we don't know where region was opened so can't do anything");
3341 regionState.updateTimestampToNow();
3342 break;
3343
3344 case PENDING_CLOSE:
3345 LOG.info("Region has been PENDING_CLOSE for too "
3346 + "long, running forced unassign again on region="
3347 + regionInfo.getRegionNameAsString());
3348 invokeUnassign(regionInfo);
3349 break;
3350 case CLOSING:
3351 LOG.info("Region has been CLOSING for too " +
3352 "long, this should eventually complete or the server will " +
3353 "expire, send RPC again");
3354 invokeUnassign(regionInfo);
3355 break;
3356
3357 case SPLIT:
3358 case SPLITTING:
3359 case FAILED_OPEN:
3360 case FAILED_CLOSE:
3361 case MERGING:
3362 break;
3363
3364 default:
3365 throw new IllegalStateException("Received event is not valid.");
3366 }
3367 }
3368 }
3369
3370 private void processOpeningState(HRegionInfo regionInfo) {
3371 LOG.info("Region has been OPENING for too long, reassigning region="
3372 + regionInfo.getRegionNameAsString());
3373
3374 try {
3375 String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
3376 Stat stat = new Stat();
3377 byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat);
3378 if (data == null) {
3379 LOG.warn("Data is null, node " + node + " no longer exists");
3380 return;
3381 }
3382 RegionTransition rt = RegionTransition.parseFrom(data);
3383 EventType et = rt.getEventType();
3384 if (et == EventType.RS_ZK_REGION_OPENED) {
3385 LOG.debug("Region has transitioned to OPENED, allowing "
3386 + "watched event handlers to process");
3387 return;
3388 } else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) {
3389 LOG.warn("While timing out a region, found ZK node in unexpected state: " + et);
3390 return;
3391 }
3392 invokeAssign(regionInfo);
3393 } catch (KeeperException ke) {
3394 LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
3395 } catch (DeserializationException e) {
3396 LOG.error("Unexpected exception parsing CLOSING region", e);
3397 }
3398 }
3399
3400 void invokeAssign(HRegionInfo regionInfo) {
3401 invokeAssign(regionInfo, true);
3402 }
3403
3404 void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3405 threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3406 }
3407
3408 void invokeUnAssign(HRegionInfo regionInfo) {
3409 threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3410 }
3411
3412 private void invokeUnassign(HRegionInfo regionInfo) {
3413 threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3414 }
3415
3416 public boolean isCarryingMeta(ServerName serverName) {
3417 return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3418 }
3419
3420
3421
3422
3423
3424
3425
3426
3427
3428
3429
3430 private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3431 RegionTransition rt = null;
3432 try {
3433 byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3434
3435 rt = data == null? null: RegionTransition.parseFrom(data);
3436 } catch (KeeperException e) {
3437 server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3438 } catch (DeserializationException e) {
3439 server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3440 }
3441
3442 ServerName addressFromZK = rt != null? rt.getServerName(): null;
3443 if (addressFromZK != null) {
3444
3445 boolean matchZK = addressFromZK.equals(serverName);
3446 LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3447 " current=" + serverName + ", matches=" + matchZK);
3448 return matchZK;
3449 }
3450
3451 ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3452 boolean matchAM = (addressFromAM != null &&
3453 addressFromAM.equals(serverName));
3454 LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3455 " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3456 " server being checked: " + serverName);
3457
3458 return matchAM;
3459 }
3460
3461
3462
3463
3464
3465
3466 public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3467
3468 synchronized (this.regionPlans) {
3469 for (Iterator <Map.Entry<String, RegionPlan>> i =
3470 this.regionPlans.entrySet().iterator(); i.hasNext();) {
3471 Map.Entry<String, RegionPlan> e = i.next();
3472 ServerName otherSn = e.getValue().getDestination();
3473
3474 if (otherSn != null && otherSn.equals(sn)) {
3475
3476 i.remove();
3477 }
3478 }
3479 }
3480 List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3481 for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3482 HRegionInfo hri = it.next();
3483 String encodedName = hri.getEncodedName();
3484
3485
3486 Lock lock = locker.acquireLock(encodedName);
3487 try {
3488 RegionState regionState =
3489 regionStates.getRegionTransitionState(encodedName);
3490 if (regionState == null
3491 || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3492 || !(regionState.isFailedClose() || regionState.isOffline()
3493 || regionState.isPendingOpenOrOpening())) {
3494 LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3495 + " on the dead server any more: " + sn);
3496 it.remove();
3497 } else {
3498 try {
3499
3500 ZKAssign.deleteNodeFailSilent(watcher, hri);
3501 } catch (KeeperException ke) {
3502 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3503 }
3504 if (zkTable.isDisablingOrDisabledTable(hri.getTable())) {
3505 regionStates.regionOffline(hri);
3506 it.remove();
3507 continue;
3508 }
3509
3510 regionStates.updateRegionState(hri, State.OFFLINE);
3511 }
3512 } finally {
3513 lock.unlock();
3514 }
3515 }
3516 return regions;
3517 }
3518
3519
3520
3521
3522 public void balance(final RegionPlan plan) {
3523 HRegionInfo hri = plan.getRegionInfo();
3524 TableName tableName = hri.getTable();
3525 if (zkTable.isDisablingOrDisabledTable(tableName)) {
3526 LOG.info("Ignored moving region of disabling/disabled table "
3527 + tableName);
3528 return;
3529 }
3530
3531
3532 String encodedName = hri.getEncodedName();
3533 ReentrantLock lock = locker.acquireLock(encodedName);
3534 try {
3535 if (!regionStates.isRegionOnline(hri)) {
3536 RegionState state = regionStates.getRegionState(encodedName);
3537 LOG.info("Ignored moving region not assigned: " + hri + ", "
3538 + (state == null ? "not in region states" : state));
3539 return;
3540 }
3541 synchronized (this.regionPlans) {
3542 this.regionPlans.put(plan.getRegionName(), plan);
3543 }
3544 unassign(hri, false, plan.getDestination());
3545 } finally {
3546 lock.unlock();
3547 }
3548 }
3549
3550 public void stop() {
3551 shutdown();
3552 if (tomActivated){
3553 this.timeoutMonitor.interrupt();
3554 this.timerUpdater.interrupt();
3555 }
3556 }
3557
3558
3559
3560
3561 public void shutdown() {
3562
3563 synchronized (zkEventWorkerWaitingList){
3564 zkEventWorkerWaitingList.clear();
3565 }
3566 threadPoolExecutorService.shutdownNow();
3567 zkEventWorkers.shutdownNow();
3568 regionStateStore.stop();
3569 }
3570
3571 protected void setEnabledTable(TableName tableName) {
3572 try {
3573 this.zkTable.setEnabledTable(tableName);
3574 } catch (KeeperException e) {
3575
3576 String errorMsg = "Unable to ensure that the table " + tableName
3577 + " will be" + " enabled because of a ZooKeeper issue";
3578 LOG.error(errorMsg);
3579 this.server.abort(errorMsg, e);
3580 }
3581 }
3582
3583
3584
3585
3586
3587
3588
3589 private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3590 final AsyncCallback.StringCallback cb, final ServerName destination) {
3591 if (!state.isClosed() && !state.isOffline()) {
3592 this.server.abort("Unexpected state trying to OFFLINE; " + state,
3593 new IllegalStateException());
3594 return false;
3595 }
3596 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3597 try {
3598 ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3599 destination, cb, state);
3600 } catch (KeeperException e) {
3601 if (e instanceof NodeExistsException) {
3602 LOG.warn("Node for " + state.getRegion() + " already exists");
3603 } else {
3604 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3605 }
3606 return false;
3607 }
3608 return true;
3609 }
3610
3611 private boolean deleteNodeInStates(String encodedName,
3612 String desc, ServerName sn, EventType... types) {
3613 try {
3614 for (EventType et: types) {
3615 if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3616 return true;
3617 }
3618 }
3619 LOG.info("Failed to delete the " + desc + " node for "
3620 + encodedName + ". The node type may not match");
3621 } catch (NoNodeException e) {
3622 if (LOG.isDebugEnabled()) {
3623 LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3624 }
3625 } catch (KeeperException ke) {
3626 server.abort("Unexpected ZK exception deleting " + desc
3627 + " node for the region " + encodedName, ke);
3628 }
3629 return false;
3630 }
3631
3632 private void deleteMergingNode(String encodedName, ServerName sn) {
3633 deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3634 EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3635 }
3636
3637 private void deleteSplittingNode(String encodedName, ServerName sn) {
3638 deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3639 EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3640 }
3641
3642 private void onRegionFailedOpen(
3643 final HRegionInfo hri, final ServerName sn) {
3644 String encodedName = hri.getEncodedName();
3645 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3646 if (failedOpenCount == null) {
3647 failedOpenCount = new AtomicInteger();
3648
3649
3650
3651 failedOpenTracker.put(encodedName, failedOpenCount);
3652 }
3653 if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion() ) {
3654 regionStates.updateRegionState(hri, State.FAILED_OPEN);
3655
3656
3657 failedOpenTracker.remove(encodedName);
3658 }
3659 else {
3660 if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3661
3662
3663 LOG.warn("Failed to open the hbase:meta region " +
3664 hri.getRegionNameAsString() + " after" +
3665 failedOpenCount.get() + " retries. Continue retrying.");
3666 }
3667
3668
3669 RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3670 if (regionState != null) {
3671
3672
3673 Set<TableName> disablingOrDisabled = null;
3674 try {
3675 disablingOrDisabled = ZKTable.getDisablingTables(watcher);
3676 disablingOrDisabled.addAll(ZKTable.getDisabledTables(watcher));
3677 } catch (KeeperException e) {
3678 server.abort("Cannot retrieve info about disabling or disabled tables ", e);
3679 }
3680 if (disablingOrDisabled.contains(hri.getTable())) {
3681 offlineDisabledRegion(hri);
3682 return;
3683 }
3684
3685 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3686
3687 removeClosedRegion(hri);
3688 try {
3689 getRegionPlan(hri, sn, true);
3690 } catch (HBaseIOException e) {
3691 LOG.warn("Failed to get region plan", e);
3692 }
3693 invokeAssign(hri, false);
3694 }
3695 }
3696 }
3697
3698 private void onRegionOpen(
3699 final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3700 regionOnline(hri, sn, openSeqNum);
3701 if (useZKForAssignment) {
3702 try {
3703
3704 ZKAssign.deleteNodeFailSilent(watcher, hri);
3705 } catch (KeeperException ke) {
3706 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3707 }
3708 }
3709
3710
3711 failedOpenTracker.remove(hri.getEncodedName());
3712 if (isTableDisabledOrDisabling(hri.getTable())) {
3713 invokeUnAssign(hri);
3714 }
3715 }
3716
3717 private void onRegionClosed(final HRegionInfo hri) {
3718 if (isTableDisabledOrDisabling(hri.getTable())) {
3719 offlineDisabledRegion(hri);
3720 return;
3721 }
3722 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3723
3724 removeClosedRegion(hri);
3725 invokeAssign(hri, false);
3726 }
3727
3728 private String onRegionSplit(ServerName sn, TransitionCode code,
3729 HRegionInfo p, HRegionInfo a, HRegionInfo b) {
3730 RegionState rs_p = regionStates.getRegionState(p);
3731 RegionState rs_a = regionStates.getRegionState(a);
3732 RegionState rs_b = regionStates.getRegionState(b);
3733 if (!(rs_p.isOpenOrSplittingOnServer(sn)
3734 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3735 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3736 return "Not in state good for split";
3737 }
3738
3739 regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3740 regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3741 regionStates.updateRegionState(p, State.SPLITTING);
3742
3743 if (code == TransitionCode.SPLIT) {
3744 if (TEST_SKIP_SPLIT_HANDLING) {
3745 return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3746 }
3747 regionOffline(p, State.SPLIT);
3748 regionOnline(a, sn, 1);
3749 regionOnline(b, sn, 1);
3750
3751
3752 if (isTableDisabledOrDisabling(p.getTable())) {
3753 invokeUnAssign(a);
3754 invokeUnAssign(b);
3755 }
3756 } else if (code == TransitionCode.SPLIT_PONR) {
3757 try {
3758 regionStateStore.splitRegion(p, a, b, sn);
3759 } catch (IOException ioe) {
3760 LOG.info("Failed to record split region " + p.getShortNameToLog());
3761 return "Failed to record the splitting in meta";
3762 }
3763 } else if (code == TransitionCode.SPLIT_REVERTED) {
3764 regionOnline(p, sn);
3765 regionOffline(a);
3766 regionOffline(b);
3767
3768 if (isTableDisabledOrDisabling(p.getTable())) {
3769 invokeUnAssign(p);
3770 }
3771 }
3772 return null;
3773 }
3774
3775 private boolean isTableDisabledOrDisabling(TableName t) {
3776 Set<TableName> disablingOrDisabled = null;
3777 try {
3778 disablingOrDisabled = ZKTable.getDisablingTables(watcher);
3779 disablingOrDisabled.addAll(ZKTable.getDisabledTables(watcher));
3780 } catch (KeeperException e) {
3781 server.abort("Cannot retrieve info about disabling or disabled tables ", e);
3782 }
3783 return disablingOrDisabled.contains(t) ? true : false;
3784 }
3785
3786 private String onRegionMerge(ServerName sn, TransitionCode code,
3787 HRegionInfo p, HRegionInfo a, HRegionInfo b) {
3788 RegionState rs_p = regionStates.getRegionState(p);
3789 RegionState rs_a = regionStates.getRegionState(a);
3790 RegionState rs_b = regionStates.getRegionState(b);
3791 if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3792 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3793 return "Not in state good for merge";
3794 }
3795
3796 regionStates.updateRegionState(a, State.MERGING);
3797 regionStates.updateRegionState(b, State.MERGING);
3798 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3799
3800 String encodedName = p.getEncodedName();
3801 if (code == TransitionCode.READY_TO_MERGE) {
3802 mergingRegions.put(encodedName,
3803 new PairOfSameType<HRegionInfo>(a, b));
3804 } else if (code == TransitionCode.MERGED) {
3805 mergingRegions.remove(encodedName);
3806 regionOffline(a, State.MERGED);
3807 regionOffline(b, State.MERGED);
3808 regionOnline(p, sn, 1);
3809
3810
3811 if (isTableDisabledOrDisabling(p.getTable())) {
3812 invokeUnAssign(p);
3813 }
3814 } else if (code == TransitionCode.MERGE_PONR) {
3815 try {
3816 regionStateStore.mergeRegions(p, a, b, sn);
3817 } catch (IOException ioe) {
3818 LOG.info("Failed to record merged region " + p.getShortNameToLog());
3819 return "Failed to record the merging in meta";
3820 }
3821 } else {
3822 mergingRegions.remove(encodedName);
3823 regionOnline(a, sn);
3824 regionOnline(b, sn);
3825 regionOffline(p);
3826
3827 if (isTableDisabledOrDisabling(p.getTable())) {
3828 invokeUnAssign(a);
3829 invokeUnAssign(b);
3830 }
3831 }
3832 return null;
3833 }
3834
3835
3836
3837
3838
3839 private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3840 final String prettyPrintedRegionName, final ServerName sn) {
3841 if (!serverManager.isServerOnline(sn)) {
3842 LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3843 return false;
3844 }
3845 byte [] payloadOfMerging = rt.getPayload();
3846 List<HRegionInfo> mergingRegions;
3847 try {
3848 mergingRegions = HRegionInfo.parseDelimitedFrom(
3849 payloadOfMerging, 0, payloadOfMerging.length);
3850 } catch (IOException e) {
3851 LOG.error("Dropped merging! Failed reading " + rt.getEventType()
3852 + " payload for " + prettyPrintedRegionName);
3853 return false;
3854 }
3855 assert mergingRegions.size() == 3;
3856 HRegionInfo p = mergingRegions.get(0);
3857 HRegionInfo hri_a = mergingRegions.get(1);
3858 HRegionInfo hri_b = mergingRegions.get(2);
3859
3860 RegionState rs_p = regionStates.getRegionState(p);
3861 RegionState rs_a = regionStates.getRegionState(hri_a);
3862 RegionState rs_b = regionStates.getRegionState(hri_b);
3863
3864 if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3865 && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3866 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3867 LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3868 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3869 return false;
3870 }
3871
3872 EventType et = rt.getEventType();
3873 if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3874 try {
3875 if (RegionMergeTransaction.transitionMergingNode(watcher, p,
3876 hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE,
3877 EventType.RS_ZK_REGION_MERGING) == -1) {
3878 byte[] data = ZKAssign.getData(watcher, encodedName);
3879 EventType currentType = null;
3880 if (data != null) {
3881 RegionTransition newRt = RegionTransition.parseFrom(data);
3882 currentType = newRt.getEventType();
3883 }
3884 if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3885 && currentType != EventType.RS_ZK_REGION_MERGING)) {
3886 LOG.warn("Failed to transition pending_merge node "
3887 + encodedName + " to merging, it's now " + currentType);
3888 return false;
3889 }
3890 }
3891 } catch (Exception e) {
3892 LOG.warn("Failed to transition pending_merge node "
3893 + encodedName + " to merging", e);
3894 return false;
3895 }
3896 }
3897
3898 synchronized (regionStates) {
3899 regionStates.updateRegionState(hri_a, State.MERGING);
3900 regionStates.updateRegionState(hri_b, State.MERGING);
3901 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3902
3903 if (et != EventType.RS_ZK_REGION_MERGED) {
3904 this.mergingRegions.put(encodedName,
3905 new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3906 } else {
3907 this.mergingRegions.remove(encodedName);
3908 regionOffline(hri_a, State.MERGED);
3909 regionOffline(hri_b, State.MERGED);
3910 regionOnline(p, sn);
3911 }
3912 }
3913
3914 if (et == EventType.RS_ZK_REGION_MERGED) {
3915 LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3916
3917 try {
3918 boolean successful = false;
3919 while (!successful) {
3920
3921
3922 successful = ZKAssign.deleteNode(watcher, encodedName,
3923 EventType.RS_ZK_REGION_MERGED, sn);
3924 }
3925 } catch (KeeperException e) {
3926 if (e instanceof NoNodeException) {
3927 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3928 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
3929 } else {
3930 server.abort("Error deleting MERGED node " + encodedName, e);
3931 }
3932 }
3933 LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3934 + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3935 + hri_b.getRegionNameAsString() + ", on " + sn);
3936
3937
3938 if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
3939 unassign(p);
3940 }
3941 }
3942 return true;
3943 }
3944
3945
3946
3947
3948 private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3949 final String prettyPrintedRegionName, final ServerName sn) {
3950 if (!serverManager.isServerOnline(sn)) {
3951 LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3952 return false;
3953 }
3954 byte [] payloadOfSplitting = rt.getPayload();
3955 List<HRegionInfo> splittingRegions;
3956 try {
3957 splittingRegions = HRegionInfo.parseDelimitedFrom(
3958 payloadOfSplitting, 0, payloadOfSplitting.length);
3959 } catch (IOException e) {
3960 LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
3961 + " payload for " + prettyPrintedRegionName);
3962 return false;
3963 }
3964 assert splittingRegions.size() == 2;
3965 HRegionInfo hri_a = splittingRegions.get(0);
3966 HRegionInfo hri_b = splittingRegions.get(1);
3967
3968 RegionState rs_p = regionStates.getRegionState(encodedName);
3969 RegionState rs_a = regionStates.getRegionState(hri_a);
3970 RegionState rs_b = regionStates.getRegionState(hri_b);
3971
3972 if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
3973 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3974 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3975 LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
3976 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3977 return false;
3978 }
3979
3980 if (rs_p == null) {
3981
3982 rs_p = regionStates.updateRegionState(rt, State.OPEN);
3983 if (rs_p == null) {
3984 LOG.warn("Received splitting for region " + prettyPrintedRegionName
3985 + " from server " + sn + " but it doesn't exist anymore,"
3986 + " probably already processed its split");
3987 return false;
3988 }
3989 regionStates.regionOnline(rs_p.getRegion(), sn);
3990 }
3991
3992 HRegionInfo p = rs_p.getRegion();
3993 EventType et = rt.getEventType();
3994 if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
3995 try {
3996 if (SplitTransaction.transitionSplittingNode(watcher, p,
3997 hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_SPLIT,
3998 EventType.RS_ZK_REGION_SPLITTING) == -1) {
3999 byte[] data = ZKAssign.getData(watcher, encodedName);
4000 EventType currentType = null;
4001 if (data != null) {
4002 RegionTransition newRt = RegionTransition.parseFrom(data);
4003 currentType = newRt.getEventType();
4004 }
4005 if (currentType == null || (currentType != EventType.RS_ZK_REGION_SPLIT
4006 && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4007 LOG.warn("Failed to transition pending_split node "
4008 + encodedName + " to splitting, it's now " + currentType);
4009 return false;
4010 }
4011 }
4012 } catch (Exception e) {
4013 LOG.warn("Failed to transition pending_split node "
4014 + encodedName + " to splitting", e);
4015 return false;
4016 }
4017 }
4018
4019 synchronized (regionStates) {
4020 splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4021 regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4022 regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4023 regionStates.updateRegionState(rt, State.SPLITTING);
4024
4025
4026
4027 if (TEST_SKIP_SPLIT_HANDLING) {
4028 LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4029 return true;
4030 }
4031
4032 if (et == EventType.RS_ZK_REGION_SPLIT) {
4033 regionOffline(p, State.SPLIT);
4034 regionOnline(hri_a, sn);
4035 regionOnline(hri_b, sn);
4036 splitRegions.remove(p);
4037 }
4038 }
4039
4040 if (et == EventType.RS_ZK_REGION_SPLIT) {
4041 LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4042
4043 try {
4044 boolean successful = false;
4045 while (!successful) {
4046
4047
4048 successful = ZKAssign.deleteNode(watcher, encodedName,
4049 EventType.RS_ZK_REGION_SPLIT, sn);
4050 }
4051 } catch (KeeperException e) {
4052 if (e instanceof NoNodeException) {
4053 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4054 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
4055 } else {
4056 server.abort("Error deleting SPLIT node " + encodedName, e);
4057 }
4058 }
4059 LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4060 + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4061 + hri_b.getRegionNameAsString() + ", on " + sn);
4062
4063
4064 if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
4065 unassign(hri_a);
4066 unassign(hri_b);
4067 }
4068 }
4069 return true;
4070 }
4071
4072
4073
4074
4075
4076
4077 private void regionOffline(final HRegionInfo regionInfo, final State state) {
4078 regionStates.regionOffline(regionInfo, state);
4079 removeClosedRegion(regionInfo);
4080
4081 clearRegionPlan(regionInfo);
4082 balancer.regionOffline(regionInfo);
4083
4084
4085 sendRegionClosedNotification(regionInfo);
4086 }
4087
4088 private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4089 final ServerName serverName) {
4090 if (!this.listeners.isEmpty()) {
4091 for (AssignmentListener listener : this.listeners) {
4092 listener.regionOpened(regionInfo, serverName);
4093 }
4094 }
4095 }
4096
4097 private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4098 if (!this.listeners.isEmpty()) {
4099 for (AssignmentListener listener : this.listeners) {
4100 listener.regionClosed(regionInfo);
4101 }
4102 }
4103 }
4104
4105
4106
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
4125
4126
4127
4128
4129
4130
4131
4132
4133
4134
4135
4136
4137
4138
4139
4140
4141
4142
4143
4144
4145
4146
4147
4148
4149 protected String onRegionTransition(final ServerName serverName,
4150 final RegionStateTransition transition) {
4151 TransitionCode code = transition.getTransitionCode();
4152 HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4153 RegionState current = regionStates.getRegionState(hri);
4154 if (LOG.isDebugEnabled()) {
4155 LOG.debug("Got transition " + code + " for "
4156 + (current != null ? current.toString() : hri.getShortNameToLog())
4157 + " from " + serverName);
4158 }
4159 String errorMsg = null;
4160 switch (code) {
4161 case OPENED:
4162 if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4163 LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4164 + serverName);
4165 break;
4166 }
4167 case FAILED_OPEN:
4168 if (current == null
4169 || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4170 errorMsg = hri.getShortNameToLog()
4171 + " is not pending open on " + serverName;
4172 } else if (code == TransitionCode.FAILED_OPEN) {
4173 onRegionFailedOpen(hri, serverName);
4174 } else {
4175 long openSeqNum = HConstants.NO_SEQNUM;
4176 if (transition.hasOpenSeqNum()) {
4177 openSeqNum = transition.getOpenSeqNum();
4178 }
4179 if (openSeqNum < 0) {
4180 errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4181 } else {
4182 onRegionOpen(hri, serverName, openSeqNum);
4183 }
4184 }
4185 break;
4186
4187 case CLOSED:
4188 if (current == null
4189 || !current.isPendingCloseOrClosingOnServer(serverName)) {
4190 errorMsg = hri.getShortNameToLog()
4191 + " is not pending close on " + serverName;
4192 } else {
4193 onRegionClosed(hri);
4194 }
4195 break;
4196
4197 case READY_TO_SPLIT:
4198 case SPLIT_PONR:
4199 case SPLIT:
4200 case SPLIT_REVERTED:
4201 errorMsg = onRegionSplit(serverName, code, hri,
4202 HRegionInfo.convert(transition.getRegionInfo(1)),
4203 HRegionInfo.convert(transition.getRegionInfo(2)));
4204 break;
4205
4206 case READY_TO_MERGE:
4207 case MERGE_PONR:
4208 case MERGED:
4209 case MERGE_REVERTED:
4210 errorMsg = onRegionMerge(serverName, code, hri,
4211 HRegionInfo.convert(transition.getRegionInfo(1)),
4212 HRegionInfo.convert(transition.getRegionInfo(2)));
4213 break;
4214
4215 default:
4216 errorMsg = "Unexpected transition code " + code;
4217 }
4218 if (errorMsg != null) {
4219 LOG.error("Failed to transtion region from " + current + " to "
4220 + code + " by " + serverName + ": " + errorMsg);
4221 }
4222 return errorMsg;
4223 }
4224
4225
4226
4227
4228 public LoadBalancer getBalancer() {
4229 return this.balancer;
4230 }
4231 }