1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.Math;
24 import java.lang.Thread.UncaughtExceptionHandler;
25 import java.lang.annotation.Retention;
26 import java.lang.annotation.RetentionPolicy;
27 import java.lang.management.ManagementFactory;
28 import java.lang.management.MemoryUsage;
29 import java.lang.reflect.Constructor;
30 import java.net.BindException;
31 import java.net.InetSocketAddress;
32 import java.net.UnknownHostException;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.Comparator;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.Map.Entry;
43 import java.util.NavigableMap;
44 import java.util.Random;
45 import java.util.Set;
46 import java.util.SortedMap;
47 import java.util.TreeMap;
48 import java.util.TreeSet;
49 import java.util.concurrent.atomic.AtomicLong;
50 import java.util.concurrent.ConcurrentHashMap;
51 import java.util.concurrent.ConcurrentMap;
52 import java.util.concurrent.ConcurrentSkipListMap;
53 import java.util.concurrent.locks.ReentrantReadWriteLock;
54
55 import javax.management.ObjectName;
56
57 import com.google.common.annotations.VisibleForTesting;
58 import com.google.common.collect.Maps;
59
60 import org.apache.hadoop.hbase.util.ByteStringer;
61 import org.apache.commons.logging.Log;
62 import org.apache.commons.logging.LogFactory;
63 import org.apache.hadoop.hbase.classification.InterfaceAudience;
64 import org.apache.hadoop.conf.Configuration;
65 import org.apache.hadoop.fs.FileSystem;
66 import org.apache.hadoop.fs.Path;
67 import org.apache.hadoop.hbase.Cell;
68 import org.apache.hadoop.hbase.CellScannable;
69 import org.apache.hadoop.hbase.CellScanner;
70 import org.apache.hadoop.hbase.CellUtil;
71 import org.apache.hadoop.hbase.Chore;
72 import org.apache.hadoop.hbase.ClockOutOfSyncException;
73 import org.apache.hadoop.hbase.DoNotRetryIOException;
74 import org.apache.hadoop.hbase.HBaseConfiguration;
75 import org.apache.hadoop.hbase.HBaseIOException;
76 import org.apache.hadoop.hbase.HConstants;
77 import org.apache.hadoop.hbase.HRegionInfo;
78 import org.apache.hadoop.hbase.HTableDescriptor;
79 import org.apache.hadoop.hbase.HealthCheckChore;
80 import org.apache.hadoop.hbase.KeyValue;
81 import org.apache.hadoop.hbase.KeyValueUtil;
82 import org.apache.hadoop.hbase.NotServingRegionException;
83 import org.apache.hadoop.hbase.RemoteExceptionHandler;
84 import org.apache.hadoop.hbase.ServerName;
85 import org.apache.hadoop.hbase.Stoppable;
86 import org.apache.hadoop.hbase.TableDescriptors;
87 import org.apache.hadoop.hbase.TableName;
88 import org.apache.hadoop.hbase.UnknownScannerException;
89 import org.apache.hadoop.hbase.YouAreDeadException;
90 import org.apache.hadoop.hbase.ZNodeClearer;
91 import org.apache.hadoop.hbase.catalog.CatalogTracker;
92 import org.apache.hadoop.hbase.catalog.MetaEditor;
93 import org.apache.hadoop.hbase.catalog.MetaReader;
94 import org.apache.hadoop.hbase.client.Append;
95 import org.apache.hadoop.hbase.client.Delete;
96 import org.apache.hadoop.hbase.client.Get;
97 import org.apache.hadoop.hbase.client.HConnectionManager;
98 import org.apache.hadoop.hbase.client.Increment;
99 import org.apache.hadoop.hbase.client.Mutation;
100 import org.apache.hadoop.hbase.client.Put;
101 import org.apache.hadoop.hbase.client.Result;
102 import org.apache.hadoop.hbase.client.RowMutations;
103 import org.apache.hadoop.hbase.client.Scan;
104 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
105 import org.apache.hadoop.hbase.DroppedSnapshotException;
106 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
107 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
108 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
109 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
110 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
111 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
112 import org.apache.hadoop.hbase.executor.ExecutorService;
113 import org.apache.hadoop.hbase.executor.ExecutorType;
114 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
115 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
116 import org.apache.hadoop.hbase.fs.HFileSystem;
117 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
118 import org.apache.hadoop.hbase.io.hfile.HFile;
119 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
120 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
121 import org.apache.hadoop.hbase.ipc.PriorityFunction;
122 import org.apache.hadoop.hbase.ipc.RpcCallContext;
123 import org.apache.hadoop.hbase.ipc.RpcClient;
124 import org.apache.hadoop.hbase.ipc.RpcServer;
125 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
126 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
127 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
128 import org.apache.hadoop.hbase.ipc.ServerRpcController;
129 import org.apache.hadoop.hbase.master.RegionState.State;
130 import org.apache.hadoop.hbase.master.SplitLogManager;
131 import org.apache.hadoop.hbase.master.TableLockManager;
132 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
133 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
134 import org.apache.hadoop.hbase.protobuf.RequestConverter;
135 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
136 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
137 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
155 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
157 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
166 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
167 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
168 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
169 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
170 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
171 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
173 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
174 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
175 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
176 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
177 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
178 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
179 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
180 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
181 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
182 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
183 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
184 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
185 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
186 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
187 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
188 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
189 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
190 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
191 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
192 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
193 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
194 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
195 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
196 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
197 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
198 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
199 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
200 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
201 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
202 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
203 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
204 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
205 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
206 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
207 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
208 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
209 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
210 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
211 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
212 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
213 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
214 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
215 import org.apache.hadoop.hbase.regionserver.wal.HLog;
216 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
217 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
218 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
219 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
220 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
221 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
222 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
223 import org.apache.hadoop.hbase.security.UserProvider;
224 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
225 import org.apache.hadoop.hbase.util.Bytes;
226 import org.apache.hadoop.hbase.util.CompressionTest;
227 import org.apache.hadoop.hbase.util.ConfigUtil;
228 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
229 import org.apache.hadoop.hbase.util.FSTableDescriptors;
230 import org.apache.hadoop.hbase.util.FSUtils;
231 import org.apache.hadoop.hbase.util.InfoServer;
232 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
233 import org.apache.hadoop.hbase.util.Pair;
234 import org.apache.hadoop.hbase.util.Sleeper;
235 import org.apache.hadoop.hbase.util.Strings;
236 import org.apache.hadoop.hbase.util.Threads;
237 import org.apache.hadoop.hbase.util.VersionInfo;
238 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
239 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
240 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
241 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
242 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
243 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
244 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
245 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
246 import org.apache.hadoop.ipc.RemoteException;
247 import org.apache.hadoop.metrics.util.MBeanUtil;
248 import org.apache.hadoop.net.DNS;
249 import org.apache.hadoop.util.ReflectionUtils;
250 import org.apache.hadoop.util.StringUtils;
251 import org.apache.zookeeper.KeeperException;
252 import org.apache.zookeeper.KeeperException.NoNodeException;
253 import org.apache.zookeeper.data.Stat;
254 import org.cliffc.high_scale_lib.Counter;
255
256 import com.google.protobuf.BlockingRpcChannel;
257 import com.google.protobuf.ByteString;
258 import com.google.protobuf.Descriptors;
259 import com.google.protobuf.Message;
260 import com.google.protobuf.RpcCallback;
261 import com.google.protobuf.RpcController;
262 import com.google.protobuf.Service;
263 import com.google.protobuf.ServiceException;
264 import com.google.protobuf.TextFormat;
265
266
267
268
269
270 @InterfaceAudience.Private
271 @SuppressWarnings("deprecation")
272 public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
273 AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,
274 HBaseRPCErrorHandler, LastSequenceId {
275
276 public static final Log LOG = LogFactory.getLog(HRegionServer.class);
277
278 private final Random rand;
279
280 private final AtomicLong scannerIdGen = new AtomicLong(0L);
281
282
283
284
285
286 protected static final String OPEN = "OPEN";
287 protected static final String CLOSE = "CLOSE";
288
289
290
291
292 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
293 new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
294
295
296 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
297 "hbase.region.server.rpc.scheduler.factory.class";
298
299 protected long maxScannerResultSize;
300
301
302 protected MemStoreFlusher cacheFlusher;
303
304 protected HeapMemoryManager hMemManager;
305
306
307 protected CatalogTracker catalogTracker;
308
309
310 @SuppressWarnings("unused")
311 private RecoveringRegionWatcher recoveringRegionWatcher;
312
313
314
315
316 protected TableDescriptors tableDescriptors;
317
318
319 protected ReplicationSourceService replicationSourceHandler;
320 protected ReplicationSinkService replicationSinkHandler;
321
322
323 public CompactSplitThread compactSplitThread;
324
325 final ConcurrentHashMap<String, RegionScannerHolder> scanners =
326 new ConcurrentHashMap<String, RegionScannerHolder>();
327
328
329
330
331
332 protected final Map<String, HRegion> onlineRegions =
333 new ConcurrentHashMap<String, HRegion>();
334
335
336
337
338
339
340
341
342
343
344 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
345 new ConcurrentHashMap<String, InetSocketAddress[]>();
346
347
348
349
350
351 protected final Map<String, HRegion> recoveringRegions = Collections
352 .synchronizedMap(new HashMap<String, HRegion>());
353
354
355 protected Leases leases;
356
357
358 protected ExecutorService service;
359
360
361 final Counter requestCount = new Counter();
362
363
364 protected volatile boolean fsOk;
365 protected HFileSystem fs;
366
367
368
369
370 protected volatile boolean stopped = false;
371
372
373
374 protected volatile boolean abortRequested;
375
376
377 private RegionServerInfo.Builder rsInfo;
378
379 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
380
381
382
383 private boolean stopping = false;
384
385 private volatile boolean killed = false;
386
387 protected final Configuration conf;
388
389 private Path rootDir;
390
391 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
392
393 final int numRetries;
394 protected final int threadWakeFrequency;
395 private final int msgInterval;
396
397 protected final int numRegionsToReport;
398
399
400 private volatile RegionServerStatusService.BlockingInterface rssStub;
401
402 RpcClient rpcClient;
403
404
405
406 RpcServerInterface rpcServer;
407
408 private final InetSocketAddress isa;
409 private UncaughtExceptionHandler uncaughtExceptionHandler;
410
411
412
413
414 InfoServer infoServer;
415 private JvmPauseMonitor pauseMonitor;
416
417
418 public static final String REGIONSERVER = "regionserver";
419
420
421 public static final String REGIONSERVER_CONF = "regionserver_conf";
422
423 private MetricsRegionServer metricsRegionServer;
424 private SpanReceiverHost spanReceiverHost;
425
426
427
428
429 Chore compactionChecker;
430
431
432
433
434 Chore periodicFlusher;
435
436
437
438 protected volatile HLog hlog;
439
440
441 protected volatile HLog hlogForMeta;
442
443 LogRoller hlogRoller;
444 LogRoller metaHLogRoller;
445
446
447 protected volatile boolean isOnline;
448
449
450 private ZooKeeperWatcher zooKeeper;
451
452
453 private MasterAddressTracker masterAddressTracker;
454
455
456 private ClusterStatusTracker clusterStatusTracker;
457
458
459 private SplitLogWorker splitLogWorker;
460
461
462 private final Sleeper sleeper;
463
464 private final int rpcTimeout;
465
466 private final RegionServerAccounting regionServerAccounting;
467
468
469 final CacheConfig cacheConfig;
470
471
472 private HealthCheckChore healthCheckChore;
473
474
475 private Chore nonceManagerChore;
476
477 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
478
479
480
481
482
483
484
485 private ServerName serverNameFromMasterPOV;
486
487
488
489
490 private final long startcode;
491
492
493
494
495 private String clusterId;
496
497
498
499
500 private ObjectName mxBean = null;
501
502
503
504
505 private MovedRegionsCleaner movedRegionsCleaner;
506
507
508
509
510 private final int scannerLeaseTimeoutPeriod;
511
512
513
514
515 private final PriorityFunction priority;
516
517 private RegionServerCoprocessorHost rsHost;
518
519 private RegionServerProcedureManagerHost rspmHost;
520
521
522 private TableLockManager tableLockManager;
523
524 private final boolean useZKForAssignment;
525
526
527 private ServerName serverName;
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547 private final ServerNonceManager nonceManager;
548
549 private UserProvider userProvider;
550
551
552
553
554
555
556
557
558 public HRegionServer(Configuration conf)
559 throws IOException, InterruptedException {
560 this.fsOk = true;
561 this.conf = conf;
562 this.isOnline = false;
563 checkCodecs(this.conf);
564 this.userProvider = UserProvider.instantiate(conf);
565
566 FSUtils.setupShortCircuitRead(this.conf);
567
568
569 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
570 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
571 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
572 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
573
574 this.sleeper = new Sleeper(this.msgInterval, this);
575
576 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
577 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
578
579 this.maxScannerResultSize = conf.getLong(
580 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
581 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
582
583 this.numRegionsToReport = conf.getInt(
584 "hbase.regionserver.numregionstoreport", 10);
585
586 this.rpcTimeout = conf.getInt(
587 HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
588 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
589
590 this.abortRequested = false;
591 this.stopped = false;
592
593 this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(conf,
594 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
595 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
596 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
597
598
599 String hostname = getHostname(conf);
600 int port = conf.getInt(HConstants.REGIONSERVER_PORT,
601 HConstants.DEFAULT_REGIONSERVER_PORT);
602
603 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
604 if (initialIsa.getAddress() == null) {
605 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
606 }
607 this.rand = new Random(initialIsa.hashCode());
608 String name = "regionserver/" + initialIsa.toString();
609
610 HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
611 this.priority = new AnnotationReadingPriorityFunction(this);
612 RpcSchedulerFactory rpcSchedulerFactory;
613 try {
614 Class<?> rpcSchedulerFactoryClass = conf.getClass(
615 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
616 SimpleRpcSchedulerFactory.class);
617 rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
618 } catch (InstantiationException e) {
619 throw new IllegalArgumentException(e);
620 } catch (IllegalAccessException e) {
621 throw new IllegalArgumentException(e);
622 }
623
624 this.rpcServer = new RpcServer(this, name, getServices(),
625
626 initialIsa,
627 conf,
628 rpcSchedulerFactory.create(conf, this));
629
630
631 this.isa = this.rpcServer.getListenerAddress();
632
633 this.rpcServer.setErrorHandler(this);
634 this.startcode = System.currentTimeMillis();
635 serverName = ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
636 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
637
638
639 ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
640 "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
641
642
643 userProvider.login("hbase.regionserver.keytab.file",
644 "hbase.regionserver.kerberos.principal", this.isa.getHostName());
645 regionServerAccounting = new RegionServerAccounting();
646 cacheConfig = new CacheConfig(conf);
647 uncaughtExceptionHandler = new UncaughtExceptionHandler() {
648 @Override
649 public void uncaughtException(Thread t, Throwable e) {
650 abort("Uncaught exception in service thread " + t.getName(), e);
651 }
652 };
653
654 this.rsInfo = RegionServerInfo.newBuilder();
655
656
657 this.rsInfo.setInfoPort(putUpWebUI());
658 }
659
660 public static String getHostname(Configuration conf) throws UnknownHostException {
661 return conf.get("hbase.regionserver.ipc.address",
662 Strings.domainNamePointerToHostName(DNS.getDefaultHost(
663 conf.get("hbase.regionserver.dns.interface", "default"),
664 conf.get("hbase.regionserver.dns.nameserver", "default"))));
665 }
666
667 @Override
668 public boolean registerService(Service instance) {
669
670
671
672 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
673 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
674 LOG.error("Coprocessor service " + serviceDesc.getFullName()
675 + " already registered, rejecting request from " + instance);
676 return false;
677 }
678
679 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
680 if (LOG.isDebugEnabled()) {
681 LOG.debug("Registered regionserver coprocessor service: service=" + serviceDesc.getFullName());
682 }
683 return true;
684 }
685
686
687
688
689 private List<BlockingServiceAndInterface> getServices() {
690 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
691 bssi.add(new BlockingServiceAndInterface(
692 ClientProtos.ClientService.newReflectiveBlockingService(this),
693 ClientProtos.ClientService.BlockingInterface.class));
694 bssi.add(new BlockingServiceAndInterface(
695 AdminProtos.AdminService.newReflectiveBlockingService(this),
696 AdminProtos.AdminService.BlockingInterface.class));
697 return bssi;
698 }
699
700
701
702
703
704
705 private static void checkCodecs(final Configuration c) throws IOException {
706
707 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
708 if (codecs == null) return;
709 for (String codec : codecs) {
710 if (!CompressionTest.testCompression(codec)) {
711 throw new IOException("Compression codec " + codec +
712 " not supported, aborting RS construction");
713 }
714 }
715 }
716
717 String getClusterId() {
718 return this.clusterId;
719 }
720
721 @Override
722 public int getPriority(RequestHeader header, Message param) {
723 return priority.getPriority(header, param);
724 }
725
726 @Retention(RetentionPolicy.RUNTIME)
727 protected @interface QosPriority {
728 int priority() default 0;
729 }
730
731 PriorityFunction getPriority() {
732 return priority;
733 }
734
735 RegionScanner getScanner(long scannerId) {
736 String scannerIdString = Long.toString(scannerId);
737 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
738 if (scannerHolder != null) {
739 return scannerHolder.s;
740 }
741 return null;
742 }
743
744
745
746
747
748
749
750 private void preRegistrationInitialization(){
751 try {
752 initializeZooKeeper();
753 initializeThreads();
754 } catch (Throwable t) {
755
756
757 this.rpcServer.stop();
758 abort("Initialization of RS failed. Hence aborting RS.", t);
759 }
760 }
761
762
763
764
765
766
767
768
769
770 private void initializeZooKeeper() throws IOException, InterruptedException {
771
772 this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
773 this.isa.getPort(), this);
774
775
776
777
778 this.masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
779 this.masterAddressTracker.start();
780 blockAndCheckIfStopped(this.masterAddressTracker);
781
782
783
784 this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
785 this.clusterStatusTracker.start();
786 blockAndCheckIfStopped(this.clusterStatusTracker);
787
788
789 this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this);
790 catalogTracker.start();
791
792
793
794
795 try {
796 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
797 if (clusterId == null) {
798 this.abort("Cluster ID has not been set");
799 }
800 LOG.info("ClusterId : "+clusterId);
801 } catch (KeeperException e) {
802 this.abort("Failed to retrieve Cluster ID",e);
803 }
804
805
806 try {
807 rspmHost = new RegionServerProcedureManagerHost();
808 rspmHost.loadProcedures(conf);
809 rspmHost.initialize(this);
810 } catch (KeeperException e) {
811 this.abort("Failed to reach zk cluster when creating procedure handler.", e);
812 }
813 this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
814 ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode));
815
816
817 this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
818 }
819
820
821
822
823
824
825
826
827 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
828 throws IOException, InterruptedException {
829 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
830 if (this.stopped) {
831 throw new IOException("Received the shutdown message while waiting.");
832 }
833 }
834 }
835
836
837
838
839 private boolean isClusterUp() {
840 return this.clusterStatusTracker.isClusterUp();
841 }
842
843 private void initializeThreads() throws IOException {
844
845 this.cacheFlusher = new MemStoreFlusher(conf, this);
846
847
848 this.compactSplitThread = new CompactSplitThread(this);
849
850
851
852 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
853 this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
854
855 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
856 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
857 if (isHealthCheckerConfigured()) {
858 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
859 }
860
861 this.leases = new Leases(this.threadWakeFrequency);
862
863
864 movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
865
866 if (this.nonceManager != null) {
867
868 nonceManagerChore = this.nonceManager.createCleanupChore(this);
869 }
870
871
872 rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress(
873 this.isa.getAddress(), 0));
874 this.pauseMonitor = new JvmPauseMonitor(conf);
875 pauseMonitor.start();
876 }
877
878
879
880
881 @Override
882 public void run() {
883 try {
884
885 preRegistrationInitialization();
886 } catch (Throwable e) {
887 abort("Fatal exception during initialization", e);
888 }
889
890 try {
891
892
893 while (keepLooping()) {
894 RegionServerStartupResponse w = reportForDuty();
895 if (w == null) {
896 LOG.warn("reportForDuty failed; sleeping and then retrying.");
897 this.sleeper.sleep();
898 } else {
899 handleReportForDutyResponse(w);
900 break;
901 }
902 }
903
904
905
906
907 this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
908
909 if (!this.stopped && isHealthy()){
910
911
912 rspmHost.start();
913 }
914
915
916 long lastMsg = 0;
917 long oldRequestCount = -1;
918
919 while (!this.stopped && isHealthy()) {
920 if (!isClusterUp()) {
921 if (isOnlineRegionsEmpty()) {
922 stop("Exiting; cluster shutdown set and not carrying any regions");
923 } else if (!this.stopping) {
924 this.stopping = true;
925 LOG.info("Closing user regions");
926 closeUserRegions(this.abortRequested);
927 } else if (this.stopping) {
928 boolean allUserRegionsOffline = areAllUserRegionsOffline();
929 if (allUserRegionsOffline) {
930
931
932
933 if (oldRequestCount == getWriteRequestCount()) {
934 stop("Stopped; only catalog regions remaining online");
935 break;
936 }
937 oldRequestCount = getWriteRequestCount();
938 } else {
939
940
941
942 closeUserRegions(this.abortRequested);
943 }
944 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
945 }
946 }
947 long now = System.currentTimeMillis();
948 if ((now - lastMsg) >= msgInterval) {
949 tryRegionServerReport(lastMsg, now);
950 lastMsg = System.currentTimeMillis();
951 }
952 if (!this.stopped) this.sleeper.sleep();
953 }
954 } catch (Throwable t) {
955 if (!checkOOME(t)) {
956 String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
957 abort(prefix + t.getMessage(), t);
958 }
959 }
960
961 if (mxBean != null) {
962 MBeanUtil.unregisterMBean(mxBean);
963 mxBean = null;
964 }
965 if (this.leases != null) this.leases.closeAfterLeasesExpire();
966 this.rpcServer.stop();
967 if (this.splitLogWorker != null) {
968 splitLogWorker.stop();
969 }
970 if (this.infoServer != null) {
971 LOG.info("Stopping infoServer");
972 try {
973 this.infoServer.stop();
974 } catch (Exception e) {
975 e.printStackTrace();
976 }
977 }
978
979 if (cacheConfig.isBlockCacheEnabled()) {
980 cacheConfig.getBlockCache().shutdown();
981 }
982
983 if (movedRegionsCleaner != null) {
984 movedRegionsCleaner.stop("Region Server stopping");
985 }
986
987
988
989 if (this.hMemManager != null) this.hMemManager.stop();
990 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
991 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
992 if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
993 if (this.metaHLogRoller != null) this.metaHLogRoller.interruptIfNecessary();
994 if (this.compactionChecker != null)
995 this.compactionChecker.interrupt();
996 if (this.healthCheckChore != null) {
997 this.healthCheckChore.interrupt();
998 }
999 if (this.nonceManagerChore != null) {
1000 this.nonceManagerChore.interrupt();
1001 }
1002
1003
1004 if (rspmHost != null) {
1005 rspmHost.stop(this.abortRequested || this.killed);
1006 }
1007
1008 if (this.killed) {
1009
1010 } else if (abortRequested) {
1011 if (this.fsOk) {
1012 closeUserRegions(abortRequested);
1013 }
1014 LOG.info("aborting server " + this.serverNameFromMasterPOV);
1015 } else {
1016 closeUserRegions(abortRequested);
1017 closeAllScanners();
1018 LOG.info("stopping server " + this.serverNameFromMasterPOV);
1019 }
1020
1021
1022 if (this.catalogTracker != null) this.catalogTracker.stop();
1023
1024
1025 if (!this.killed && containsMetaTableRegions()) {
1026 if (!abortRequested || this.fsOk) {
1027 if (this.compactSplitThread != null) {
1028 this.compactSplitThread.join();
1029 this.compactSplitThread = null;
1030 }
1031 closeMetaTableRegions(abortRequested);
1032 }
1033 }
1034
1035 if (!this.killed && this.fsOk) {
1036 waitOnAllRegionsToClose(abortRequested);
1037 LOG.info("stopping server " + this.serverNameFromMasterPOV +
1038 "; all regions closed.");
1039 }
1040
1041
1042 if (this.fsOk) {
1043 closeWAL(!abortRequested);
1044 }
1045
1046
1047 if (this.rssStub != null) {
1048 this.rssStub = null;
1049 }
1050 if (this.rpcClient != null) {
1051 this.rpcClient.stop();
1052 }
1053 if (this.leases != null) {
1054 this.leases.close();
1055 }
1056 if (this.pauseMonitor != null) {
1057 this.pauseMonitor.stop();
1058 }
1059
1060 if (!killed) {
1061 join();
1062 }
1063
1064 try {
1065 deleteMyEphemeralNode();
1066 } catch (KeeperException e) {
1067 LOG.warn("Failed deleting my ephemeral node", e);
1068 }
1069
1070
1071 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1072 if (this.zooKeeper != null) {
1073 this.zooKeeper.close();
1074 }
1075 LOG.info("stopping server " + this.serverNameFromMasterPOV +
1076 "; zookeeper connection closed.");
1077
1078 LOG.info(Thread.currentThread().getName() + " exiting");
1079 }
1080
1081 private boolean containsMetaTableRegions() {
1082 return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1083 }
1084
1085 private boolean areAllUserRegionsOffline() {
1086 if (getNumberOfOnlineRegions() > 2) return false;
1087 boolean allUserRegionsOffline = true;
1088 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1089 if (!e.getValue().getRegionInfo().isMetaTable()) {
1090 allUserRegionsOffline = false;
1091 break;
1092 }
1093 }
1094 return allUserRegionsOffline;
1095 }
1096
1097
1098
1099
1100 private long getWriteRequestCount() {
1101 int writeCount = 0;
1102 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1103 writeCount += e.getValue().getWriteRequestsCount();
1104 }
1105 return writeCount;
1106 }
1107
1108 @VisibleForTesting
1109 protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1110 throws IOException {
1111 RegionServerStatusService.BlockingInterface rss = rssStub;
1112 if (rss == null) {
1113
1114 return;
1115 }
1116 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1117 try {
1118 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1119 ServerName sn = ServerName.parseVersionedServerName(
1120 this.serverNameFromMasterPOV.getVersionedBytes());
1121 request.setServer(ProtobufUtil.toServerName(sn));
1122 request.setLoad(sl);
1123 rss.regionServerReport(null, request.build());
1124 } catch (ServiceException se) {
1125 IOException ioe = ProtobufUtil.getRemoteException(se);
1126 if (ioe instanceof YouAreDeadException) {
1127
1128 throw ioe;
1129 }
1130 if (rssStub == rss) {
1131 rssStub = null;
1132 }
1133
1134
1135 createRegionServerStatusStub();
1136 }
1137 }
1138
1139 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
1140
1141
1142
1143
1144
1145
1146
1147 MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
1148 Collection<HRegion> regions = getOnlineRegionsLocalContext();
1149 MemoryUsage memory =
1150 ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1151
1152 ClusterStatusProtos.ServerLoad.Builder serverLoad =
1153 ClusterStatusProtos.ServerLoad.newBuilder();
1154 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1155 serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1156 serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1157 serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1158 Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
1159 for (String coprocessor : coprocessors) {
1160 serverLoad.addCoprocessors(
1161 Coprocessor.newBuilder().setName(coprocessor).build());
1162 }
1163 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1164 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1165 for (HRegion region : regions) {
1166 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1167 }
1168 serverLoad.setReportStartTime(reportStartTime);
1169 serverLoad.setReportEndTime(reportEndTime);
1170 if (this.infoServer != null) {
1171 serverLoad.setInfoServerPort(this.infoServer.getPort());
1172 } else {
1173 serverLoad.setInfoServerPort(-1);
1174 }
1175
1176
1177
1178 ReplicationSourceService rsources = getReplicationSourceService();
1179
1180 if (rsources != null) {
1181
1182 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1183 if (rLoad != null) {
1184 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1185 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1186 serverLoad.addReplLoadSource(rLS);
1187 }
1188 }
1189 }
1190
1191 return serverLoad.build();
1192 }
1193
1194 String getOnlineRegionsAsPrintableString() {
1195 StringBuilder sb = new StringBuilder();
1196 for (HRegion r: this.onlineRegions.values()) {
1197 if (sb.length() > 0) sb.append(", ");
1198 sb.append(r.getRegionInfo().getEncodedName());
1199 }
1200 return sb.toString();
1201 }
1202
1203
1204
1205
1206 private void waitOnAllRegionsToClose(final boolean abort) {
1207
1208 int lastCount = -1;
1209 long previousLogTime = 0;
1210 Set<String> closedRegions = new HashSet<String>();
1211 while (!isOnlineRegionsEmpty()) {
1212 int count = getNumberOfOnlineRegions();
1213
1214 if (count != lastCount) {
1215
1216 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1217 previousLogTime = System.currentTimeMillis();
1218 lastCount = count;
1219 LOG.info("Waiting on " + count + " regions to close");
1220
1221
1222 if (count < 10 && LOG.isDebugEnabled()) {
1223 LOG.debug(this.onlineRegions);
1224 }
1225 }
1226 }
1227
1228
1229
1230 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1231 HRegionInfo hri = e.getValue().getRegionInfo();
1232 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1233 && !closedRegions.contains(hri.getEncodedName())) {
1234 closedRegions.add(hri.getEncodedName());
1235
1236 closeRegionIgnoreErrors(hri, abort);
1237 }
1238 }
1239
1240 if (this.regionsInTransitionInRS.isEmpty()) {
1241 if (!isOnlineRegionsEmpty()) {
1242 LOG.info("We were exiting though online regions are not empty," +
1243 " because some regions failed closing");
1244 }
1245 break;
1246 }
1247 Threads.sleep(200);
1248 }
1249 }
1250
1251 private void closeWAL(final boolean delete) {
1252 if (this.hlogForMeta != null) {
1253
1254
1255
1256
1257 try {
1258 this.hlogForMeta.close();
1259 } catch (Throwable e) {
1260 LOG.error("Metalog close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1261 }
1262 }
1263 if (this.hlog != null) {
1264 try {
1265 if (delete) {
1266 hlog.closeAndDelete();
1267 } else {
1268 hlog.close();
1269 }
1270 } catch (Throwable e) {
1271 LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1272 }
1273 }
1274 }
1275
1276 private void closeAllScanners() {
1277
1278
1279 for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
1280 try {
1281 e.getValue().s.close();
1282 } catch (IOException ioe) {
1283 LOG.warn("Closing scanner " + e.getKey(), ioe);
1284 }
1285 }
1286 }
1287
1288
1289
1290
1291
1292
1293 protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1294 throws IOException {
1295 try {
1296 for (NameStringPair e : c.getMapEntriesList()) {
1297 String key = e.getName();
1298
1299 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1300 String hostnameFromMasterPOV = e.getValue();
1301 this.serverNameFromMasterPOV = ServerName.valueOf(hostnameFromMasterPOV,
1302 this.isa.getPort(), this.startcode);
1303 if (!hostnameFromMasterPOV.equals(this.isa.getHostName())) {
1304 LOG.info("Master passed us a different hostname to use; was=" +
1305 this.isa.getHostName() + ", but now=" + hostnameFromMasterPOV);
1306 }
1307 continue;
1308 }
1309 String value = e.getValue();
1310 if (LOG.isDebugEnabled()) {
1311 LOG.debug("Config from master: " + key + "=" + value);
1312 }
1313 this.conf.set(key, value);
1314 }
1315
1316
1317
1318 if (this.conf.get("mapred.task.id") == null) {
1319 this.conf.set("mapred.task.id", "hb_rs_" +
1320 this.serverNameFromMasterPOV.toString());
1321 }
1322
1323 createMyEphemeralNode();
1324
1325
1326 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1327
1328
1329
1330
1331
1332
1333 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
1334
1335
1336 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1337 this.fs = new HFileSystem(this.conf, useHBaseChecksum);
1338 this.rootDir = FSUtils.getRootDir(this.conf);
1339 this.tableDescriptors = new FSTableDescriptors(this.conf, this.fs, this.rootDir, true, false);
1340 this.hlog = setupWALAndReplication();
1341
1342 this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1343
1344 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
1345
1346 startServiceThreads();
1347 startHeapMemoryManager();
1348 LOG.info("Serving as " + this.serverNameFromMasterPOV +
1349 ", RpcServer on " + this.isa +
1350 ", sessionid=0x" +
1351 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1352 isOnline = true;
1353 } catch (Throwable e) {
1354 this.isOnline = false;
1355 stop("Failed initialization");
1356 throw convertThrowableToIOE(cleanup(e, "Failed init"),
1357 "Region server startup failed");
1358 } finally {
1359 sleeper.skipSleepCycle();
1360 }
1361 }
1362
1363 private void startHeapMemoryManager() {
1364 this.hMemManager = HeapMemoryManager.create(this);
1365 if (this.hMemManager != null) {
1366 this.hMemManager.start();
1367 }
1368 }
1369
1370 private void createMyEphemeralNode() throws KeeperException, IOException {
1371 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1372 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1373 getMyEphemeralNodePath(), data);
1374 }
1375
1376 private void deleteMyEphemeralNode() throws KeeperException {
1377 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1378 }
1379
1380 @Override
1381 public RegionServerAccounting getRegionServerAccounting() {
1382 return regionServerAccounting;
1383 }
1384
1385 @Override
1386 public TableLockManager getTableLockManager() {
1387 return tableLockManager;
1388 }
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398 private RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1399 RegionSpecifier.Builder regionSpecifier) {
1400 byte[] name = r.getRegionName();
1401 int stores = 0;
1402 int storefiles = 0;
1403 int storeUncompressedSizeMB = 0;
1404 int storefileSizeMB = 0;
1405 int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1406 int storefileIndexSizeMB = 0;
1407 int rootIndexSizeKB = 0;
1408 int totalStaticIndexSizeKB = 0;
1409 int totalStaticBloomSizeKB = 0;
1410 long totalCompactingKVs = 0;
1411 long currentCompactedKVs = 0;
1412 synchronized (r.stores) {
1413 stores += r.stores.size();
1414 for (Store store : r.stores.values()) {
1415 storefiles += store.getStorefilesCount();
1416 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1417 / 1024 / 1024);
1418 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1419 storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1420 CompactionProgress progress = store.getCompactionProgress();
1421 if (progress != null) {
1422 totalCompactingKVs += progress.totalCompactingKVs;
1423 currentCompactedKVs += progress.currentCompactedKVs;
1424 }
1425
1426 rootIndexSizeKB +=
1427 (int) (store.getStorefilesIndexSize() / 1024);
1428
1429 totalStaticIndexSizeKB +=
1430 (int) (store.getTotalStaticIndexSize() / 1024);
1431
1432 totalStaticBloomSizeKB +=
1433 (int) (store.getTotalStaticBloomSize() / 1024);
1434 }
1435 }
1436 float dataLocality =
1437 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1438 if (regionLoadBldr == null) {
1439 regionLoadBldr = RegionLoad.newBuilder();
1440 }
1441 if (regionSpecifier == null) {
1442 regionSpecifier = RegionSpecifier.newBuilder();
1443 }
1444 regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1445 regionSpecifier.setValue(ByteStringer.wrap(name));
1446 regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1447 .setStores(stores)
1448 .setStorefiles(storefiles)
1449 .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1450 .setStorefileSizeMB(storefileSizeMB)
1451 .setMemstoreSizeMB(memstoreSizeMB)
1452 .setStorefileIndexSizeMB(storefileIndexSizeMB)
1453 .setRootIndexSizeKB(rootIndexSizeKB)
1454 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1455 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1456 .setReadRequestsCount(r.readRequestsCount.get())
1457 .setWriteRequestsCount(r.writeRequestsCount.get())
1458 .setTotalCompactingKVs(totalCompactingKVs)
1459 .setCurrentCompactedKVs(currentCompactedKVs)
1460 .setCompleteSequenceId(r.completeSequenceId)
1461 .setDataLocality(dataLocality);
1462
1463 return regionLoadBldr.build();
1464 }
1465
1466
1467
1468
1469
1470 public RegionLoad createRegionLoad(final String encodedRegionName) {
1471 HRegion r = null;
1472 r = this.onlineRegions.get(encodedRegionName);
1473 return r != null ? createRegionLoad(r, null, null) : null;
1474 }
1475
1476
1477
1478
1479 private static class CompactionChecker extends Chore {
1480 private final HRegionServer instance;
1481 private final int majorCompactPriority;
1482 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1483 private long iteration = 0;
1484
1485 CompactionChecker(final HRegionServer h, final int sleepTime,
1486 final Stoppable stopper) {
1487 super("CompactionChecker", sleepTime, h);
1488 this.instance = h;
1489 LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1490
1491
1492
1493
1494 this.majorCompactPriority = this.instance.conf.
1495 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1496 DEFAULT_PRIORITY);
1497 }
1498
1499 @Override
1500 protected void chore() {
1501 for (HRegion r : this.instance.onlineRegions.values()) {
1502 if (r == null)
1503 continue;
1504 for (Store s : r.getStores().values()) {
1505 try {
1506 long multiplier = s.getCompactionCheckMultiplier();
1507 assert multiplier > 0;
1508 if (iteration % multiplier != 0) continue;
1509 if (s.needsCompaction()) {
1510
1511 this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1512 + " requests compaction");
1513 } else if (s.isMajorCompaction()) {
1514 if (majorCompactPriority == DEFAULT_PRIORITY
1515 || majorCompactPriority > r.getCompactPriority()) {
1516 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1517 + " requests major compaction; use default priority", null);
1518 } else {
1519 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1520 + " requests major compaction; use configured priority",
1521 this.majorCompactPriority, null);
1522 }
1523 }
1524 } catch (IOException e) {
1525 LOG.warn("Failed major compaction check on " + r, e);
1526 }
1527 }
1528 }
1529 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1530 }
1531 }
1532
1533 class PeriodicMemstoreFlusher extends Chore {
1534 final HRegionServer server;
1535 final static int RANGE_OF_DELAY = 20000;
1536 final static int MIN_DELAY_TIME = 3000;
1537 public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1538 super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
1539 this.server = server;
1540 }
1541
1542 @Override
1543 protected void chore() {
1544 for (HRegion r : this.server.onlineRegions.values()) {
1545 if (r == null)
1546 continue;
1547 if (r.shouldFlush()) {
1548 FlushRequester requester = server.getFlushRequester();
1549 if (requester != null) {
1550 long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1551 LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
1552 " after a delay of " + randomDelay);
1553
1554
1555
1556 requester.requestDelayedFlush(r, randomDelay);
1557 }
1558 }
1559 }
1560 }
1561 }
1562
1563
1564
1565
1566
1567
1568
1569
1570 public boolean isOnline() {
1571 return isOnline;
1572 }
1573
1574
1575
1576
1577
1578
1579
1580 private HLog setupWALAndReplication() throws IOException {
1581 final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1582 final String logName
1583 = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
1584
1585 Path logdir = new Path(rootDir, logName);
1586 if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1587 if (this.fs.exists(logdir)) {
1588 throw new RegionServerRunningException("Region server has already " +
1589 "created directory at " + this.serverNameFromMasterPOV.toString());
1590 }
1591
1592
1593
1594 createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1595
1596 return instantiateHLog(rootDir, logName);
1597 }
1598
1599 private HLog getMetaWAL() throws IOException {
1600 if (this.hlogForMeta != null) return this.hlogForMeta;
1601 final String logName = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
1602 Path logdir = new Path(rootDir, logName);
1603 if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1604 this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName,
1605 this.conf, getMetaWALActionListeners(), this.serverNameFromMasterPOV.toString());
1606 return this.hlogForMeta;
1607 }
1608
1609
1610
1611
1612
1613
1614
1615
1616 protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
1617 return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
1618 getWALActionListeners(), this.serverNameFromMasterPOV.toString());
1619 }
1620
1621
1622
1623
1624
1625
1626
1627 protected List<WALActionsListener> getWALActionListeners() {
1628 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1629
1630 this.hlogRoller = new LogRoller(this, this);
1631 listeners.add(this.hlogRoller);
1632 if (this.replicationSourceHandler != null &&
1633 this.replicationSourceHandler.getWALActionsListener() != null) {
1634
1635 listeners.add(this.replicationSourceHandler.getWALActionsListener());
1636 }
1637 return listeners;
1638 }
1639
1640 protected List<WALActionsListener> getMetaWALActionListeners() {
1641 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1642
1643
1644 MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
1645 String n = Thread.currentThread().getName();
1646 Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1647 n + "-MetaLogRoller", uncaughtExceptionHandler);
1648 this.metaHLogRoller = tmpLogRoller;
1649 tmpLogRoller = null;
1650 listeners.add(this.metaHLogRoller);
1651 return listeners;
1652 }
1653
1654 protected LogRoller getLogRoller() {
1655 return hlogRoller;
1656 }
1657
1658 public MetricsRegionServer getMetrics() {
1659 return this.metricsRegionServer;
1660 }
1661
1662
1663
1664
1665 public MasterAddressTracker getMasterAddressTracker() {
1666 return this.masterAddressTracker;
1667 }
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681 private void startServiceThreads() throws IOException {
1682 String n = Thread.currentThread().getName();
1683
1684 this.service = new ExecutorService(getServerName().toShortString());
1685 this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1686 conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1687 this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1688 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1689 this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1690 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1691 this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1692 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1693 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1694 this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1695 conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1696 }
1697 this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
1698 conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS));
1699
1700 Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
1701 uncaughtExceptionHandler);
1702 this.cacheFlusher.start(uncaughtExceptionHandler);
1703 Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
1704 ".compactionChecker", uncaughtExceptionHandler);
1705 Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
1706 ".periodicFlusher", uncaughtExceptionHandler);
1707 if (this.healthCheckChore != null) {
1708 Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
1709 uncaughtExceptionHandler);
1710 }
1711 if (this.nonceManagerChore != null) {
1712 Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), n + ".nonceCleaner",
1713 uncaughtExceptionHandler);
1714 }
1715
1716
1717
1718 this.leases.setName(n + ".leaseChecker");
1719 this.leases.start();
1720
1721 if (this.replicationSourceHandler == this.replicationSinkHandler &&
1722 this.replicationSourceHandler != null) {
1723 this.replicationSourceHandler.startReplicationService();
1724 } else {
1725 if (this.replicationSourceHandler != null) {
1726 this.replicationSourceHandler.startReplicationService();
1727 }
1728 if (this.replicationSinkHandler != null) {
1729 this.replicationSinkHandler.startReplicationService();
1730 }
1731 }
1732
1733
1734
1735 this.rpcServer.start();
1736
1737
1738
1739
1740
1741 Configuration sinkConf = HBaseConfiguration.create(conf);
1742 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1743 conf.getInt("hbase.log.replay.retries.number", 8));
1744 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1745 conf.getInt("hbase.log.replay.rpc.timeout", 30000));
1746 sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1747 this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
1748 splitLogWorker.start();
1749 }
1750
1751
1752
1753
1754
1755
1756 private int putUpWebUI() throws IOException {
1757 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 60030);
1758
1759 if (port < 0) return port;
1760 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1761
1762 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1763 false);
1764 while (true) {
1765 try {
1766 this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf);
1767 this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class);
1768 this.infoServer.addServlet("dump", "/dump", RSDumpServlet.class);
1769 this.infoServer.setAttribute(REGIONSERVER, this);
1770 this.infoServer.setAttribute(REGIONSERVER_CONF, conf);
1771 this.infoServer.start();
1772 break;
1773 } catch (BindException e) {
1774 if (!auto) {
1775
1776 LOG.error("Failed binding http info server to port: " + port);
1777 throw e;
1778 }
1779
1780 LOG.info("Failed binding http info server to port: " + port);
1781 port++;
1782 }
1783 }
1784 return this.infoServer.getPort();
1785 }
1786
1787
1788
1789
1790 private boolean isHealthy() {
1791 if (!fsOk) {
1792
1793 return false;
1794 }
1795
1796 if (!(leases.isAlive()
1797 && cacheFlusher.isAlive() && hlogRoller.isAlive()
1798 && this.compactionChecker.isAlive()
1799 && this.periodicFlusher.isAlive())) {
1800 stop("One or more threads are no longer alive -- stop");
1801 return false;
1802 }
1803 if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
1804 stop("Meta HLog roller thread is no longer alive -- stop");
1805 return false;
1806 }
1807 return true;
1808 }
1809
1810 public HLog getWAL() {
1811 try {
1812 return getWAL(null);
1813 } catch (IOException e) {
1814 LOG.warn("getWAL threw exception " + e);
1815 return null;
1816 }
1817 }
1818
1819 @Override
1820 public HLog getWAL(HRegionInfo regionInfo) throws IOException {
1821
1822
1823
1824
1825 if (regionInfo != null && regionInfo.isMetaTable()) {
1826 return getMetaWAL();
1827 }
1828 return this.hlog;
1829 }
1830
1831 @Override
1832 public CatalogTracker getCatalogTracker() {
1833 return this.catalogTracker;
1834 }
1835
1836 @Override
1837 public void stop(final String msg) {
1838 if (!this.stopped) {
1839 try {
1840 if (this.rsHost != null) {
1841 this.rsHost.preStop(msg);
1842 }
1843 this.stopped = true;
1844 LOG.info("STOPPED: " + msg);
1845
1846 sleeper.skipSleepCycle();
1847 } catch (IOException exp) {
1848 LOG.warn("The region server did not stop", exp);
1849 }
1850 }
1851 }
1852
1853 public void waitForServerOnline(){
1854 while (!isOnline() && !isStopped()){
1855 sleeper.sleep();
1856 }
1857 }
1858
1859 @Override
1860 public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
1861 throws KeeperException, IOException {
1862 checkOpen();
1863 LOG.info("Post open deploy tasks for region=" + r.getRegionNameAsString());
1864
1865 for (Store s : r.getStores().values()) {
1866 if (s.hasReferences() || s.needsCompaction()) {
1867 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1868 }
1869 }
1870 long openSeqNum = r.getOpenSeqNum();
1871 if (openSeqNum == HConstants.NO_SEQNUM) {
1872
1873 LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
1874 openSeqNum = 0;
1875 }
1876
1877
1878 updateRecoveringRegionLastFlushedSequenceId(r);
1879
1880 if (useZKForAssignment) {
1881 if (r.getRegionInfo().isMetaRegion()) {
1882 LOG.info("Updating zk with meta location");
1883
1884
1885 MetaRegionTracker.setMetaLocation(getZooKeeper(), this.serverNameFromMasterPOV, State.OPEN);
1886 } else {
1887 MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), this.serverNameFromMasterPOV,
1888 openSeqNum);
1889 }
1890 }
1891 if (!useZKForAssignment
1892 && !reportRegionStateTransition(TransitionCode.OPENED, openSeqNum, r.getRegionInfo())) {
1893 throw new IOException("Failed to report opened region to master: "
1894 + r.getRegionNameAsString());
1895 }
1896
1897 LOG.info("Finished post open deploy task for " + r.getRegionNameAsString());
1898
1899 }
1900
1901 @Override
1902 public RpcServerInterface getRpcServer() {
1903 return rpcServer;
1904 }
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916 @Override
1917 public void abort(String reason, Throwable cause) {
1918 String msg = "ABORTING region server " + this + ": " + reason;
1919 if (cause != null) {
1920 LOG.fatal(msg, cause);
1921 } else {
1922 LOG.fatal(msg);
1923 }
1924 this.abortRequested = true;
1925
1926
1927
1928 LOG.fatal("RegionServer abort: loaded coprocessors are: " +
1929 CoprocessorHost.getLoadedCoprocessors());
1930
1931 try {
1932 if (cause != null) {
1933 msg += "\nCause:\n" + StringUtils.stringifyException(cause);
1934 }
1935
1936 if (rssStub != null && this.serverNameFromMasterPOV != null) {
1937 ReportRSFatalErrorRequest.Builder builder =
1938 ReportRSFatalErrorRequest.newBuilder();
1939 ServerName sn =
1940 ServerName.parseVersionedServerName(this.serverNameFromMasterPOV.getVersionedBytes());
1941 builder.setServer(ProtobufUtil.toServerName(sn));
1942 builder.setErrorMessage(msg);
1943 rssStub.reportRSFatalError(null, builder.build());
1944 }
1945 } catch (Throwable t) {
1946 LOG.warn("Unable to report fatal error to master", t);
1947 }
1948 stop(reason);
1949 }
1950
1951
1952
1953
1954 public void abort(String reason) {
1955 abort(reason, null);
1956 }
1957
1958 @Override
1959 public boolean isAborted() {
1960 return this.abortRequested;
1961 }
1962
1963
1964
1965
1966
1967
1968 protected void kill() {
1969 this.killed = true;
1970 abort("Simulated kill");
1971 }
1972
1973
1974
1975
1976
1977 protected void join() {
1978 if (this.nonceManagerChore != null) {
1979 Threads.shutdown(this.nonceManagerChore.getThread());
1980 }
1981 if (this.compactionChecker != null) {
1982 Threads.shutdown(this.compactionChecker.getThread());
1983 }
1984 if (this.periodicFlusher != null) {
1985 Threads.shutdown(this.periodicFlusher.getThread());
1986 }
1987 if (this.cacheFlusher != null) {
1988 this.cacheFlusher.join();
1989 }
1990 if (this.healthCheckChore != null) {
1991 Threads.shutdown(this.healthCheckChore.getThread());
1992 }
1993 if (this.spanReceiverHost != null) {
1994 this.spanReceiverHost.closeReceivers();
1995 }
1996 if (this.hlogRoller != null) {
1997 Threads.shutdown(this.hlogRoller.getThread());
1998 }
1999 if (this.metaHLogRoller != null) {
2000 Threads.shutdown(this.metaHLogRoller.getThread());
2001 }
2002 if (this.compactSplitThread != null) {
2003 this.compactSplitThread.join();
2004 }
2005 if (this.service != null) this.service.shutdown();
2006 if (this.replicationSourceHandler != null &&
2007 this.replicationSourceHandler == this.replicationSinkHandler) {
2008 this.replicationSourceHandler.stopReplicationService();
2009 } else {
2010 if (this.replicationSourceHandler != null) {
2011 this.replicationSourceHandler.stopReplicationService();
2012 }
2013 if (this.replicationSinkHandler != null) {
2014 this.replicationSinkHandler.stopReplicationService();
2015 }
2016 }
2017 }
2018
2019 @Override
2020 public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
2021 return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
2022 }
2023
2024 @Override
2025 public boolean reportRegionStateTransition(TransitionCode code, long openSeqNum, HRegionInfo... hris) {
2026 ReportRegionStateTransitionRequest.Builder builder = ReportRegionStateTransitionRequest.newBuilder();
2027 builder.setServer(ProtobufUtil.toServerName(serverName));
2028 RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2029 transition.setTransitionCode(code);
2030 if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2031 transition.setOpenSeqNum(openSeqNum);
2032 }
2033 for (HRegionInfo hri : hris) {
2034 transition.addRegionInfo(HRegionInfo.convert(hri));
2035 }
2036 ReportRegionStateTransitionRequest request = builder.build();
2037 while (keepLooping()) {
2038 RegionServerStatusService.BlockingInterface rss = rssStub;
2039 try {
2040 if (rss == null) {
2041 createRegionServerStatusStub();
2042 continue;
2043 }
2044 ReportRegionStateTransitionResponse response = rss.reportRegionStateTransition(null, request);
2045 if (response.hasErrorMessage()) {
2046 LOG.info("Failed to transition " + hris[0] + " to " + code + ": "
2047 + response.getErrorMessage());
2048 return false;
2049 }
2050 return true;
2051 } catch (ServiceException se) {
2052 IOException ioe = ProtobufUtil.getRemoteException(se);
2053 LOG.info("Failed to report region transition, will retry", ioe);
2054 if (rssStub == rss) {
2055 rssStub = null;
2056 }
2057 }
2058 }
2059 return false;
2060 }
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070 @VisibleForTesting
2071 protected synchronized ServerName
2072 createRegionServerStatusStub() {
2073 if (rssStub != null) {
2074 return masterAddressTracker.getMasterAddress();
2075 }
2076 ServerName sn = null;
2077 long previousLogTime = 0;
2078 RegionServerStatusService.BlockingInterface master = null;
2079 boolean refresh = false;
2080 RegionServerStatusService.BlockingInterface intf = null;
2081 while (keepLooping() && master == null) {
2082 sn = this.masterAddressTracker.getMasterAddress(refresh);
2083 if (sn == null) {
2084 if (!keepLooping()) {
2085
2086 LOG.debug("No master found and cluster is stopped; bailing out");
2087 return null;
2088 }
2089 LOG.debug("No master found; retry");
2090 previousLogTime = System.currentTimeMillis();
2091 refresh = true;
2092 sleeper.sleep();
2093 continue;
2094 }
2095
2096 new InetSocketAddress(sn.getHostname(), sn.getPort());
2097 try {
2098 BlockingRpcChannel channel =
2099 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), this.rpcTimeout);
2100 intf = RegionServerStatusService.newBlockingStub(channel);
2101 break;
2102 } catch (IOException e) {
2103 e = e instanceof RemoteException ?
2104 ((RemoteException)e).unwrapRemoteException() : e;
2105 if (e instanceof ServerNotRunningYetException) {
2106 if (System.currentTimeMillis() > (previousLogTime+1000)){
2107 LOG.info("Master isn't available yet, retrying");
2108 previousLogTime = System.currentTimeMillis();
2109 }
2110 } else {
2111 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2112 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2113 previousLogTime = System.currentTimeMillis();
2114 }
2115 }
2116 try {
2117 Thread.sleep(200);
2118 } catch (InterruptedException ignored) {
2119 }
2120 }
2121 }
2122 rssStub = intf;
2123 return sn;
2124 }
2125
2126
2127
2128
2129
2130 private boolean keepLooping() {
2131 return !this.stopped && isClusterUp();
2132 }
2133
2134
2135
2136
2137
2138
2139
2140
2141 private RegionServerStartupResponse reportForDuty() throws IOException {
2142 ServerName masterServerName = createRegionServerStatusStub();
2143 if (masterServerName == null) return null;
2144 RegionServerStartupResponse result = null;
2145 try {
2146 this.requestCount.set(0);
2147 LOG.info("reportForDuty to master=" + masterServerName + " with port=" + this.isa.getPort() +
2148 ", startcode=" + this.startcode);
2149 long now = EnvironmentEdgeManager.currentTimeMillis();
2150 int port = this.isa.getPort();
2151 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2152 request.setPort(port);
2153 request.setServerStartCode(this.startcode);
2154 request.setServerCurrentTime(now);
2155 result = this.rssStub.regionServerStartup(null, request.build());
2156 } catch (ServiceException se) {
2157 IOException ioe = ProtobufUtil.getRemoteException(se);
2158 if (ioe instanceof ClockOutOfSyncException) {
2159 LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2160
2161 throw ioe;
2162 } else if (ioe instanceof ServerNotRunningYetException) {
2163 LOG.debug("Master is not running yet");
2164 } else {
2165 LOG.warn("error telling master we are up", se);
2166 rssStub = null;
2167 }
2168 }
2169 return result;
2170 }
2171
2172 @Override
2173 public long getLastSequenceId(byte[] encodedRegionName) {
2174 long lastFlushedSequenceId = -1L;
2175 try {
2176 GetLastFlushedSequenceIdRequest req = RequestConverter
2177 .buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2178 lastFlushedSequenceId = rssStub.getLastFlushedSequenceId(null, req)
2179 .getLastFlushedSequenceId();
2180 } catch (ServiceException e) {
2181 lastFlushedSequenceId = -1L;
2182 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e);
2183 }
2184 return lastFlushedSequenceId;
2185 }
2186
2187
2188
2189
2190
2191
2192 protected void closeAllRegions(final boolean abort) {
2193 closeUserRegions(abort);
2194 closeMetaTableRegions(abort);
2195 }
2196
2197
2198
2199
2200
2201 void closeMetaTableRegions(final boolean abort) {
2202 HRegion meta = null;
2203 this.lock.writeLock().lock();
2204 try {
2205 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2206 HRegionInfo hri = e.getValue().getRegionInfo();
2207 if (hri.isMetaRegion()) {
2208 meta = e.getValue();
2209 }
2210 if (meta != null) break;
2211 }
2212 } finally {
2213 this.lock.writeLock().unlock();
2214 }
2215 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2216 }
2217
2218
2219
2220
2221
2222
2223
2224 void closeUserRegions(final boolean abort) {
2225 this.lock.writeLock().lock();
2226 try {
2227 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2228 HRegion r = e.getValue();
2229 if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2230
2231 closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2232 }
2233 }
2234 } finally {
2235 this.lock.writeLock().unlock();
2236 }
2237 }
2238
2239
2240 public InfoServer getInfoServer() {
2241 return infoServer;
2242 }
2243
2244
2245
2246
2247 @Override
2248 public boolean isStopped() {
2249 return this.stopped;
2250 }
2251
2252 @Override
2253 public boolean isStopping() {
2254 return this.stopping;
2255 }
2256
2257 @Override
2258 public Map<String, HRegion> getRecoveringRegions() {
2259 return this.recoveringRegions;
2260 }
2261
2262
2263
2264
2265
2266 @Override
2267 public Configuration getConfiguration() {
2268 return conf;
2269 }
2270
2271
2272 ReentrantReadWriteLock.WriteLock getWriteLock() {
2273 return lock.writeLock();
2274 }
2275
2276 public int getNumberOfOnlineRegions() {
2277 return this.onlineRegions.size();
2278 }
2279
2280 boolean isOnlineRegionsEmpty() {
2281 return this.onlineRegions.isEmpty();
2282 }
2283
2284
2285
2286
2287
2288
2289 public Collection<HRegion> getOnlineRegionsLocalContext() {
2290 Collection<HRegion> regions = this.onlineRegions.values();
2291 return Collections.unmodifiableCollection(regions);
2292 }
2293
2294 @Override
2295 public void addToOnlineRegions(HRegion region) {
2296 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2297 }
2298
2299
2300
2301
2302
2303
2304 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2305
2306 SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2307 new Comparator<Long>() {
2308 @Override
2309 public int compare(Long a, Long b) {
2310 return -1 * a.compareTo(b);
2311 }
2312 });
2313
2314 for (HRegion region : this.onlineRegions.values()) {
2315 sortedRegions.put(region.memstoreSize.get(), region);
2316 }
2317 return sortedRegions;
2318 }
2319
2320
2321
2322
2323 public long getStartcode() {
2324 return this.startcode;
2325 }
2326
2327
2328 @Override
2329 public FlushRequester getFlushRequester() {
2330 return this.cacheFlusher;
2331 }
2332
2333
2334
2335
2336
2337
2338
2339 protected HRegionInfo[] getMostLoadedRegions() {
2340 ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2341 for (HRegion r : onlineRegions.values()) {
2342 if (!r.isAvailable()) {
2343 continue;
2344 }
2345 if (regions.size() < numRegionsToReport) {
2346 regions.add(r.getRegionInfo());
2347 } else {
2348 break;
2349 }
2350 }
2351 return regions.toArray(new HRegionInfo[regions.size()]);
2352 }
2353
2354 @Override
2355 public Leases getLeases() {
2356 return leases;
2357 }
2358
2359
2360
2361
2362 protected Path getRootDir() {
2363 return rootDir;
2364 }
2365
2366
2367
2368
2369 @Override
2370 public FileSystem getFileSystem() {
2371 return fs;
2372 }
2373
2374 @Override
2375 public String toString() {
2376 return getServerName().toString();
2377 }
2378
2379
2380
2381
2382
2383
2384 public int getThreadWakeFrequency() {
2385 return threadWakeFrequency;
2386 }
2387
2388 @Override
2389 public ZooKeeperWatcher getZooKeeper() {
2390 return zooKeeper;
2391 }
2392
2393 @Override
2394 public ServerName getServerName() {
2395
2396 return this.serverNameFromMasterPOV == null?
2397 ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), this.startcode) :
2398 this.serverNameFromMasterPOV;
2399 }
2400
2401 @Override
2402 public CompactionRequestor getCompactionRequester() {
2403 return this.compactSplitThread;
2404 }
2405
2406 public ZooKeeperWatcher getZooKeeperWatcher() {
2407 return this.zooKeeper;
2408 }
2409
2410 public RegionServerCoprocessorHost getCoprocessorHost(){
2411 return this.rsHost;
2412 }
2413
2414 @Override
2415 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2416 return this.regionsInTransitionInRS;
2417 }
2418
2419 @Override
2420 public ExecutorService getExecutorService() {
2421 return service;
2422 }
2423
2424
2425
2426
2427
2428
2429
2430
2431 static private void createNewReplicationInstance(Configuration conf,
2432 HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2433
2434
2435 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2436 HConstants.REPLICATION_ENABLE_DEFAULT)) {
2437 return;
2438 }
2439
2440
2441 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2442 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2443
2444
2445 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2446 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2447
2448
2449
2450 if (sourceClassname.equals(sinkClassname)) {
2451 server.replicationSourceHandler = (ReplicationSourceService)
2452 newReplicationInstance(sourceClassname,
2453 conf, server, fs, logDir, oldLogDir);
2454 server.replicationSinkHandler = (ReplicationSinkService)
2455 server.replicationSourceHandler;
2456 } else {
2457 server.replicationSourceHandler = (ReplicationSourceService)
2458 newReplicationInstance(sourceClassname,
2459 conf, server, fs, logDir, oldLogDir);
2460 server.replicationSinkHandler = (ReplicationSinkService)
2461 newReplicationInstance(sinkClassname,
2462 conf, server, fs, logDir, oldLogDir);
2463 }
2464 }
2465
2466 static private ReplicationService newReplicationInstance(String classname,
2467 Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2468 Path oldLogDir) throws IOException{
2469
2470 Class<?> clazz = null;
2471 try {
2472 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2473 clazz = Class.forName(classname, true, classLoader);
2474 } catch (java.lang.ClassNotFoundException nfe) {
2475 throw new IOException("Could not find class for " + classname);
2476 }
2477
2478
2479 ReplicationService service = (ReplicationService)
2480 ReflectionUtils.newInstance(clazz, conf);
2481 service.initialize(server, fs, logDir, oldLogDir);
2482 return service;
2483 }
2484
2485
2486
2487
2488
2489
2490 public static Thread startRegionServer(final HRegionServer hrs)
2491 throws IOException {
2492 return startRegionServer(hrs, "regionserver" + hrs.isa.getPort());
2493 }
2494
2495
2496
2497
2498
2499
2500
2501 public static Thread startRegionServer(final HRegionServer hrs,
2502 final String name) throws IOException {
2503 Thread t = new Thread(hrs);
2504 t.setName(name);
2505 t.start();
2506
2507
2508 ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
2509 .getConfiguration()), hrs, t);
2510 return t;
2511 }
2512
2513
2514
2515
2516
2517
2518
2519
2520 public static HRegionServer constructRegionServer(
2521 Class<? extends HRegionServer> regionServerClass,
2522 final Configuration conf2) {
2523 try {
2524 Constructor<? extends HRegionServer> c = regionServerClass
2525 .getConstructor(Configuration.class);
2526 return c.newInstance(conf2);
2527 } catch (Exception e) {
2528 throw new RuntimeException("Failed construction of " + "Regionserver: "
2529 + regionServerClass.toString(), e);
2530 }
2531 }
2532
2533
2534
2535
2536 public static void main(String[] args) throws Exception {
2537 VersionInfo.logVersion();
2538 Configuration conf = HBaseConfiguration.create();
2539 @SuppressWarnings("unchecked")
2540 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2541 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2542
2543 new HRegionServerCommandLine(regionServerClass).doMain(args);
2544 }
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556 @Override
2557 public List<HRegion> getOnlineRegions(TableName tableName) {
2558 List<HRegion> tableRegions = new ArrayList<HRegion>();
2559 synchronized (this.onlineRegions) {
2560 for (HRegion region: this.onlineRegions.values()) {
2561 HRegionInfo regionInfo = region.getRegionInfo();
2562 if(regionInfo.getTable().equals(tableName)) {
2563 tableRegions.add(region);
2564 }
2565 }
2566 }
2567 return tableRegions;
2568 }
2569
2570
2571 public String[] getCoprocessors() {
2572 TreeSet<String> coprocessors = new TreeSet<String>(
2573 this.hlog.getCoprocessorHost().getCoprocessors());
2574 Collection<HRegion> regions = getOnlineRegionsLocalContext();
2575 for (HRegion region: regions) {
2576 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2577 }
2578 return coprocessors.toArray(new String[coprocessors.size()]);
2579 }
2580
2581
2582
2583
2584
2585 private class ScannerListener implements LeaseListener {
2586 private final String scannerName;
2587
2588 ScannerListener(final String n) {
2589 this.scannerName = n;
2590 }
2591
2592 @Override
2593 public void leaseExpired() {
2594 RegionScannerHolder rsh = scanners.remove(this.scannerName);
2595 if (rsh != null) {
2596 RegionScanner s = rsh.s;
2597 LOG.info("Scanner " + this.scannerName + " lease expired on region "
2598 + s.getRegionInfo().getRegionNameAsString());
2599 try {
2600 HRegion region = getRegion(s.getRegionInfo().getRegionName());
2601 if (region != null && region.getCoprocessorHost() != null) {
2602 region.getCoprocessorHost().preScannerClose(s);
2603 }
2604
2605 s.close();
2606 if (region != null && region.getCoprocessorHost() != null) {
2607 region.getCoprocessorHost().postScannerClose(s);
2608 }
2609 } catch (IOException e) {
2610 LOG.error("Closing scanner for "
2611 + s.getRegionInfo().getRegionNameAsString(), e);
2612 }
2613 } else {
2614 LOG.info("Scanner " + this.scannerName + " lease expired");
2615 }
2616 }
2617 }
2618
2619
2620
2621
2622
2623
2624 protected void checkOpen() throws IOException {
2625 if (this.stopped || this.abortRequested) {
2626 throw new RegionServerStoppedException("Server " + getServerName() +
2627 " not running" + (this.abortRequested ? ", aborting" : ""));
2628 }
2629 if (!fsOk) {
2630 throw new RegionServerStoppedException("File system not available");
2631 }
2632 }
2633
2634
2635
2636
2637
2638
2639 private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2640 try {
2641 if (!closeRegion(region.getEncodedName(), abort, false, -1, null)) {
2642 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2643 " - ignoring and continuing");
2644 }
2645 } catch (IOException e) {
2646 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2647 " - ignoring and continuing", e);
2648 }
2649 }
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675 protected boolean closeRegion(String encodedName, final boolean abort,
2676 final boolean zk, final int versionOfClosingNode, final ServerName sn)
2677 throws NotServingRegionException, RegionAlreadyInTransitionException {
2678
2679 HRegion actualRegion = this.getFromOnlineRegions(encodedName);
2680 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2681 try {
2682 actualRegion.getCoprocessorHost().preClose(false);
2683 } catch (IOException exp) {
2684 LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2685 return false;
2686 }
2687 }
2688
2689 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2690 Boolean.FALSE);
2691
2692 if (Boolean.TRUE.equals(previous)) {
2693 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2694 "trying to OPEN. Cancelling OPENING.");
2695 if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2696
2697
2698 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2699 " Doing a standard close now");
2700 return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
2701 }
2702
2703 actualRegion = this.getFromOnlineRegions(encodedName);
2704 if (actualRegion == null) {
2705 LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2706
2707 throw new RegionAlreadyInTransitionException("The region " + encodedName +
2708 " was opening but not yet served. Opening is cancelled.");
2709 }
2710 } else if (Boolean.FALSE.equals(previous)) {
2711 LOG.info("Received CLOSE for the region: " + encodedName +
2712 " ,which we are already trying to CLOSE, but not completed yet");
2713
2714
2715
2716
2717
2718
2719 throw new RegionAlreadyInTransitionException("The region " + encodedName +
2720 " was already closing. New CLOSE request is ignored.");
2721 }
2722
2723 if (actualRegion == null) {
2724 LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2725 this.regionsInTransitionInRS.remove(encodedName.getBytes());
2726
2727 throw new NotServingRegionException("The region " + encodedName +
2728 " is not online, and is not opening.");
2729 }
2730
2731 CloseRegionHandler crh;
2732 final HRegionInfo hri = actualRegion.getRegionInfo();
2733 if (hri.isMetaRegion()) {
2734 crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode);
2735 } else {
2736 crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn);
2737 }
2738 this.service.submit(crh);
2739 return true;
2740 }
2741
2742
2743
2744
2745
2746
2747 public HRegion getOnlineRegion(final byte[] regionName) {
2748 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2749 return this.onlineRegions.get(encodedRegionName);
2750 }
2751
2752 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2753 return this.regionFavoredNodesMap.get(encodedRegionName);
2754 }
2755
2756 @Override
2757 public HRegion getFromOnlineRegions(final String encodedRegionName) {
2758 return this.onlineRegions.get(encodedRegionName);
2759 }
2760
2761
2762 @Override
2763 public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
2764 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2765
2766 if (destination != null) {
2767 HLog wal = getWAL();
2768 long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2769 if (closeSeqNum == HConstants.NO_SEQNUM) {
2770
2771 closeSeqNum = r.getOpenSeqNum();
2772 if (closeSeqNum == HConstants.NO_SEQNUM) {
2773 closeSeqNum = 0;
2774 }
2775 }
2776 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2777 }
2778 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2779 return toReturn != null;
2780 }
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790 protected HRegion getRegion(final byte[] regionName)
2791 throws NotServingRegionException {
2792 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2793 return getRegionByEncodedName(regionName, encodedRegionName);
2794 }
2795
2796 protected HRegion getRegionByEncodedName(String encodedRegionName)
2797 throws NotServingRegionException {
2798 return getRegionByEncodedName(null, encodedRegionName);
2799 }
2800
2801 protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2802 throws NotServingRegionException {
2803 HRegion region = this.onlineRegions.get(encodedRegionName);
2804 if (region == null) {
2805 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2806 if (moveInfo != null) {
2807 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2808 }
2809 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2810 String regionNameStr = regionName == null?
2811 encodedRegionName: Bytes.toStringBinary(regionName);
2812 if (isOpening != null && isOpening.booleanValue()) {
2813 throw new RegionOpeningException("Region " + regionNameStr +
2814 " is opening on " + this.serverNameFromMasterPOV);
2815 }
2816 throw new NotServingRegionException("Region " + regionNameStr +
2817 " is not online on " + this.serverNameFromMasterPOV);
2818 }
2819 return region;
2820 }
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830 protected Throwable cleanup(final Throwable t) {
2831 return cleanup(t, null);
2832 }
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844 protected Throwable cleanup(final Throwable t, final String msg) {
2845
2846 if (t instanceof NotServingRegionException) {
2847 LOG.debug("NotServingRegionException; " + t.getMessage());
2848 return t;
2849 }
2850 if (msg == null) {
2851 LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2852 } else {
2853 LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2854 }
2855 if (!checkOOME(t)) {
2856 checkFileSystem();
2857 }
2858 return t;
2859 }
2860
2861
2862
2863
2864
2865
2866
2867
2868 protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2869 return (t instanceof IOException ? (IOException) t : msg == null
2870 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2871 }
2872
2873
2874
2875
2876
2877
2878
2879
2880 @Override
2881 public boolean checkOOME(final Throwable e) {
2882 boolean stop = false;
2883 try {
2884 if (e instanceof OutOfMemoryError
2885 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
2886 || (e.getMessage() != null && e.getMessage().contains(
2887 "java.lang.OutOfMemoryError"))) {
2888 stop = true;
2889 LOG.fatal(
2890 "Run out of memory; HRegionServer will abort itself immediately", e);
2891 }
2892 } finally {
2893 if (stop) {
2894 Runtime.getRuntime().halt(1);
2895 }
2896 }
2897 return stop;
2898 }
2899
2900
2901
2902
2903
2904
2905
2906 public boolean checkFileSystem() {
2907 if (this.fsOk && this.fs != null) {
2908 try {
2909 FSUtils.checkFileSystemAvailable(this.fs);
2910 } catch (IOException e) {
2911 abort("File System not available", e);
2912 this.fsOk = false;
2913 }
2914 }
2915 return this.fsOk;
2916 }
2917
2918 protected long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException {
2919 long scannerId = this.scannerIdGen.incrementAndGet();
2920 String scannerName = String.valueOf(scannerId);
2921
2922 RegionScannerHolder existing =
2923 scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
2924 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
2925
2926 this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
2927 new ScannerListener(scannerName));
2928
2929 return scannerId;
2930 }
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941 @Override
2942 public GetResponse get(final RpcController controller,
2943 final GetRequest request) throws ServiceException {
2944 long before = EnvironmentEdgeManager.currentTimeMillis();
2945 try {
2946 checkOpen();
2947 requestCount.increment();
2948 HRegion region = getRegion(request.getRegion());
2949
2950 GetResponse.Builder builder = GetResponse.newBuilder();
2951 ClientProtos.Get get = request.getGet();
2952 Boolean existence = null;
2953 Result r = null;
2954
2955 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2956 if (get.getColumnCount() != 1) {
2957 throw new DoNotRetryIOException(
2958 "get ClosestRowBefore supports one and only one family now, not "
2959 + get.getColumnCount() + " families");
2960 }
2961 byte[] row = get.getRow().toByteArray();
2962 byte[] family = get.getColumn(0).getFamily().toByteArray();
2963 r = region.getClosestRowBefore(row, family);
2964 } else {
2965 Get clientGet = ProtobufUtil.toGet(get);
2966 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2967 existence = region.getCoprocessorHost().preExists(clientGet);
2968 }
2969 if (existence == null) {
2970 r = region.get(clientGet);
2971 if (get.getExistenceOnly()) {
2972 boolean exists = r.getExists();
2973 if (region.getCoprocessorHost() != null) {
2974 exists = region.getCoprocessorHost().postExists(clientGet, exists);
2975 }
2976 existence = exists;
2977 }
2978 }
2979 }
2980 if (existence != null){
2981 ClientProtos.Result pbr = ProtobufUtil.toResult(existence);
2982 builder.setResult(pbr);
2983 } else if (r != null) {
2984 ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2985 builder.setResult(pbr);
2986 }
2987 return builder.build();
2988 } catch (IOException ie) {
2989 throw new ServiceException(ie);
2990 } finally {
2991 metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
2992 }
2993 }
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003 @Override
3004 public MutateResponse mutate(final RpcController rpcc,
3005 final MutateRequest request) throws ServiceException {
3006
3007
3008 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
3009 CellScanner cellScanner = controller != null? controller.cellScanner(): null;
3010
3011 if (controller != null) controller.setCellScanner(null);
3012 try {
3013 checkOpen();
3014 requestCount.increment();
3015 HRegion region = getRegion(request.getRegion());
3016 MutateResponse.Builder builder = MutateResponse.newBuilder();
3017 MutationProto mutation = request.getMutation();
3018 if (!region.getRegionInfo().isMetaTable()) {
3019 cacheFlusher.reclaimMemStoreMemory();
3020 }
3021 long nonceGroup = request.hasNonceGroup()
3022 ? request.getNonceGroup() : HConstants.NO_NONCE;
3023 Result r = null;
3024 Boolean processed = null;
3025 MutationType type = mutation.getMutateType();
3026 switch (type) {
3027 case APPEND:
3028
3029 r = append(region, mutation, cellScanner, nonceGroup);
3030 break;
3031 case INCREMENT:
3032
3033 r = increment(region, mutation, cellScanner, nonceGroup);
3034 break;
3035 case PUT:
3036 Put put = ProtobufUtil.toPut(mutation, cellScanner);
3037 if (request.hasCondition()) {
3038 Condition condition = request.getCondition();
3039 byte[] row = condition.getRow().toByteArray();
3040 byte[] family = condition.getFamily().toByteArray();
3041 byte[] qualifier = condition.getQualifier().toByteArray();
3042 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
3043 ByteArrayComparable comparator =
3044 ProtobufUtil.toComparator(condition.getComparator());
3045 if (region.getCoprocessorHost() != null) {
3046 processed = region.getCoprocessorHost().preCheckAndPut(
3047 row, family, qualifier, compareOp, comparator, put);
3048 }
3049 if (processed == null) {
3050 boolean result = region.checkAndMutate(row, family,
3051 qualifier, compareOp, comparator, put, true);
3052 if (region.getCoprocessorHost() != null) {
3053 result = region.getCoprocessorHost().postCheckAndPut(row, family,
3054 qualifier, compareOp, comparator, put, result);
3055 }
3056 processed = result;
3057 }
3058 } else {
3059 region.put(put);
3060 processed = Boolean.TRUE;
3061 }
3062 break;
3063 case DELETE:
3064 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3065 if (request.hasCondition()) {
3066 Condition condition = request.getCondition();
3067 byte[] row = condition.getRow().toByteArray();
3068 byte[] family = condition.getFamily().toByteArray();
3069 byte[] qualifier = condition.getQualifier().toByteArray();
3070 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
3071 ByteArrayComparable comparator =
3072 ProtobufUtil.toComparator(condition.getComparator());
3073 if (region.getCoprocessorHost() != null) {
3074 processed = region.getCoprocessorHost().preCheckAndDelete(
3075 row, family, qualifier, compareOp, comparator, delete);
3076 }
3077 if (processed == null) {
3078 boolean result = region.checkAndMutate(row, family,
3079 qualifier, compareOp, comparator, delete, true);
3080 if (region.getCoprocessorHost() != null) {
3081 result = region.getCoprocessorHost().postCheckAndDelete(row, family,
3082 qualifier, compareOp, comparator, delete, result);
3083 }
3084 processed = result;
3085 }
3086 } else {
3087 region.delete(delete);
3088 processed = Boolean.TRUE;
3089 }
3090 break;
3091 default:
3092 throw new DoNotRetryIOException(
3093 "Unsupported mutate type: " + type.name());
3094 }
3095 if (processed != null) builder.setProcessed(processed.booleanValue());
3096 addResult(builder, r, controller);
3097 return builder.build();
3098 } catch (IOException ie) {
3099 checkFileSystem();
3100 throw new ServiceException(ie);
3101 }
3102 }
3103
3104
3105
3106
3107
3108 private boolean isClientCellBlockSupport() {
3109 RpcCallContext context = RpcServer.getCurrentCall();
3110 return context != null && context.isClientCellBlockSupport();
3111 }
3112
3113 private void addResult(final MutateResponse.Builder builder,
3114 final Result result, final PayloadCarryingRpcController rpcc) {
3115 if (result == null) return;
3116 if (isClientCellBlockSupport()) {
3117 builder.setResult(ProtobufUtil.toResultNoData(result));
3118 rpcc.setCellScanner(result.cellScanner());
3119 } else {
3120 ClientProtos.Result pbr = ProtobufUtil.toResult(result);
3121 builder.setResult(pbr);
3122 }
3123 }
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136 @Override
3137 public ScanResponse scan(final RpcController controller, final ScanRequest request)
3138 throws ServiceException {
3139 Leases.Lease lease = null;
3140 String scannerName = null;
3141 try {
3142 if (!request.hasScannerId() && !request.hasScan()) {
3143 throw new DoNotRetryIOException(
3144 "Missing required input: scannerId or scan");
3145 }
3146 long scannerId = -1;
3147 if (request.hasScannerId()) {
3148 scannerId = request.getScannerId();
3149 scannerName = String.valueOf(scannerId);
3150 }
3151 try {
3152 checkOpen();
3153 } catch (IOException e) {
3154
3155
3156 if (scannerName != null) {
3157 try {
3158 leases.cancelLease(scannerName);
3159 } catch (LeaseException le) {
3160 LOG.info("Server shutting down and client tried to access missing scanner " +
3161 scannerName);
3162 }
3163 }
3164 throw e;
3165 }
3166 requestCount.increment();
3167
3168 int ttl = 0;
3169 HRegion region = null;
3170 RegionScanner scanner = null;
3171 RegionScannerHolder rsh = null;
3172 boolean moreResults = true;
3173 boolean closeScanner = false;
3174 ScanResponse.Builder builder = ScanResponse.newBuilder();
3175 if (request.hasCloseScanner()) {
3176 closeScanner = request.getCloseScanner();
3177 }
3178 int rows = closeScanner ? 0 : 1;
3179 if (request.hasNumberOfRows()) {
3180 rows = request.getNumberOfRows();
3181 }
3182 if (request.hasScannerId()) {
3183 rsh = scanners.get(scannerName);
3184 if (rsh == null) {
3185 LOG.info("Client tried to access missing scanner " + scannerName);
3186 throw new UnknownScannerException(
3187 "Name: " + scannerName + ", already closed?");
3188 }
3189 scanner = rsh.s;
3190 HRegionInfo hri = scanner.getRegionInfo();
3191 region = getRegion(hri.getRegionName());
3192 if (region != rsh.r) {
3193 throw new NotServingRegionException("Region was re-opened after the scanner"
3194 + scannerName + " was created: " + hri.getRegionNameAsString());
3195 }
3196 } else {
3197 region = getRegion(request.getRegion());
3198 ClientProtos.Scan protoScan = request.getScan();
3199 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3200 Scan scan = ProtobufUtil.toScan(protoScan);
3201
3202 if (!isLoadingCfsOnDemandSet) {
3203 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3204 }
3205 scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
3206 region.prepareScanner(scan);
3207 if (region.getCoprocessorHost() != null) {
3208 scanner = region.getCoprocessorHost().preScannerOpen(scan);
3209 }
3210 if (scanner == null) {
3211 scanner = region.getScanner(scan);
3212 }
3213 if (region.getCoprocessorHost() != null) {
3214 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3215 }
3216 scannerId = addScanner(scanner, region);
3217 scannerName = String.valueOf(scannerId);
3218 ttl = this.scannerLeaseTimeoutPeriod;
3219 }
3220
3221 if (rows > 0) {
3222
3223
3224
3225 if (request.hasNextCallSeq()) {
3226 if (rsh == null) {
3227 rsh = scanners.get(scannerName);
3228 }
3229 if (rsh != null) {
3230 if (request.getNextCallSeq() != rsh.nextCallSeq) {
3231 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
3232 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
3233 "; request=" + TextFormat.shortDebugString(request));
3234 }
3235
3236 rsh.nextCallSeq++;
3237 }
3238 }
3239 try {
3240
3241
3242 lease = leases.removeLease(scannerName);
3243
3244
3245 List<Result> results = new ArrayList<Result>(Math.min(rows, 100));
3246 long currentScanResultSize = 0;
3247 long totalKvSize = 0;
3248
3249 boolean done = false;
3250
3251 if (region != null && region.getCoprocessorHost() != null) {
3252 Boolean bypass = region.getCoprocessorHost().preScannerNext(
3253 scanner, results, rows);
3254 if (!results.isEmpty()) {
3255 for (Result r : results) {
3256 if (maxScannerResultSize < Long.MAX_VALUE){
3257 for (Cell cell : r.rawCells()) {
3258 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3259 currentScanResultSize += kv.heapSizeWithoutTags();
3260 totalKvSize += kv.getLength();
3261 }
3262 }
3263 }
3264 }
3265 if (bypass != null && bypass.booleanValue()) {
3266 done = true;
3267 }
3268 }
3269
3270 if (!done) {
3271 long maxResultSize = scanner.getMaxResultSize();
3272 if (maxResultSize <= 0) {
3273 maxResultSize = maxScannerResultSize;
3274 }
3275 List<Cell> values = new ArrayList<Cell>();
3276 region.startRegionOperation(Operation.SCAN);
3277 try {
3278 int i = 0;
3279 synchronized(scanner) {
3280 boolean moreRows = false;
3281 while (i < rows) {
3282
3283 if ((maxResultSize < Long.MAX_VALUE) &&
3284 (currentScanResultSize >= maxResultSize)) {
3285 builder.setMoreResultsInRegion(true);
3286 break;
3287 }
3288
3289 moreRows = scanner.nextRaw(values);
3290 if (!values.isEmpty()) {
3291 for (Cell cell : values) {
3292 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3293 currentScanResultSize += kv.heapSizeWithoutTags();
3294 totalKvSize += kv.getLength();
3295 }
3296 results.add(Result.create(values));
3297 i++;
3298 }
3299 if (!moreRows) {
3300 break;
3301 }
3302 values.clear();
3303 }
3304 if (currentScanResultSize >= maxResultSize || i >= rows || moreRows) {
3305
3306 builder.setMoreResultsInRegion(true);
3307 } else {
3308
3309 builder.setMoreResultsInRegion(false);
3310 }
3311 }
3312 region.readRequestsCount.add(i);
3313 region.getMetrics().updateScanNext(totalKvSize);
3314 } finally {
3315 region.closeRegionOperation();
3316 }
3317
3318
3319 if (region != null && region.getCoprocessorHost() != null) {
3320 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
3321 }
3322 }
3323
3324
3325
3326
3327 if (scanner.isFilterDone() && results.isEmpty()) {
3328 moreResults = false;
3329 results = null;
3330 } else {
3331 addResults(builder, results, controller);
3332 }
3333 } finally {
3334
3335
3336 if (scanners.containsKey(scannerName)) {
3337 if (lease != null) leases.addLease(lease);
3338 ttl = this.scannerLeaseTimeoutPeriod;
3339 }
3340 }
3341 }
3342
3343 if (!moreResults || closeScanner) {
3344 ttl = 0;
3345 moreResults = false;
3346 if (region != null && region.getCoprocessorHost() != null) {
3347 if (region.getCoprocessorHost().preScannerClose(scanner)) {
3348 return builder.build();
3349 }
3350 }
3351 rsh = scanners.remove(scannerName);
3352 if (rsh != null) {
3353 scanner = rsh.s;
3354 scanner.close();
3355 leases.cancelLease(scannerName);
3356 if (region != null && region.getCoprocessorHost() != null) {
3357 region.getCoprocessorHost().postScannerClose(scanner);
3358 }
3359 }
3360 }
3361
3362 if (ttl > 0) {
3363 builder.setTtl(ttl);
3364 }
3365 builder.setScannerId(scannerId);
3366 builder.setMoreResults(moreResults);
3367 return builder.build();
3368 } catch (IOException ie) {
3369 if (scannerName != null && ie instanceof NotServingRegionException) {
3370 RegionScannerHolder rsh = scanners.remove(scannerName);
3371 if (rsh != null) {
3372 try {
3373 RegionScanner scanner = rsh.s;
3374 LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
3375 scanner.close();
3376 leases.cancelLease(scannerName);
3377 } catch (IOException e) {
3378 LOG.warn("Getting exception closing " + scannerName, e);
3379 }
3380 }
3381 }
3382 throw new ServiceException(ie);
3383 }
3384 }
3385
3386 private void addResults(final ScanResponse.Builder builder, final List<Result> results,
3387 final RpcController controller) {
3388 if (results == null || results.isEmpty()) return;
3389 if (isClientCellBlockSupport()) {
3390 for (Result res : results) {
3391 builder.addCellsPerResult(res.size());
3392 }
3393 ((PayloadCarryingRpcController)controller).
3394 setCellScanner(CellUtil.createCellScanner(results));
3395 } else {
3396 for (Result res: results) {
3397 ClientProtos.Result pbr = ProtobufUtil.toResult(res);
3398 builder.addResults(pbr);
3399 }
3400 }
3401 }
3402
3403
3404
3405
3406
3407
3408 @Override
3409 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
3410 final BulkLoadHFileRequest request) throws ServiceException {
3411 try {
3412 checkOpen();
3413 requestCount.increment();
3414 HRegion region = getRegion(request.getRegion());
3415 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
3416 for (FamilyPath familyPath: request.getFamilyPathList()) {
3417 familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
3418 familyPath.getPath()));
3419 }
3420 boolean bypass = false;
3421 if (region.getCoprocessorHost() != null) {
3422 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
3423 }
3424 boolean loaded = false;
3425 if (!bypass) {
3426 loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum());
3427 }
3428 if (region.getCoprocessorHost() != null) {
3429 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
3430 }
3431 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
3432 builder.setLoaded(loaded);
3433 return builder.build();
3434 } catch (IOException ie) {
3435 throw new ServiceException(ie);
3436 }
3437 }
3438
3439 @Override
3440 public CoprocessorServiceResponse execService(final RpcController controller,
3441 final CoprocessorServiceRequest request) throws ServiceException {
3442 try {
3443 checkOpen();
3444 requestCount.increment();
3445 HRegion region = getRegion(request.getRegion());
3446 Message result = execServiceOnRegion(region, request.getCall());
3447 CoprocessorServiceResponse.Builder builder =
3448 CoprocessorServiceResponse.newBuilder();
3449 builder.setRegion(RequestConverter.buildRegionSpecifier(
3450 RegionSpecifierType.REGION_NAME, region.getRegionName()));
3451 builder.setValue(
3452 builder.getValueBuilder().setName(result.getClass().getName())
3453 .setValue(result.toByteString()));
3454 return builder.build();
3455 } catch (IOException ie) {
3456 throw new ServiceException(ie);
3457 }
3458 }
3459
3460 private Message execServiceOnRegion(HRegion region,
3461 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
3462
3463 ServerRpcController execController = new ServerRpcController();
3464 Message result = region.execService(execController, serviceCall);
3465 if (execController.getFailedOn() != null) {
3466 throw execController.getFailedOn();
3467 }
3468 return result;
3469 }
3470
3471 @Override
3472 public CoprocessorServiceResponse execRegionServerService(final RpcController controller,
3473 final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3474 try {
3475 ServerRpcController execController = new ServerRpcController();
3476 CoprocessorServiceCall call = serviceRequest.getCall();
3477 String serviceName = call.getServiceName();
3478 String methodName = call.getMethodName();
3479 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3480 throw new UnknownProtocolException(null,
3481 "No registered coprocessor service found for name " + serviceName);
3482 }
3483 Service service = coprocessorServiceHandlers.get(serviceName);
3484 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3485 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3486 if (methodDesc == null) {
3487 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3488 + " called on service " + serviceName);
3489 }
3490 Message request =
3491 service.getRequestPrototype(methodDesc).newBuilderForType().mergeFrom(call.getRequest())
3492 .build();
3493 final Message.Builder responseBuilder =
3494 service.getResponsePrototype(methodDesc).newBuilderForType();
3495 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
3496 @Override
3497 public void run(Message message) {
3498 if (message != null) {
3499 responseBuilder.mergeFrom(message);
3500 }
3501 }
3502 });
3503 Message execResult = responseBuilder.build();
3504 if (execController.getFailedOn() != null) {
3505 throw execController.getFailedOn();
3506 }
3507 ClientProtos.CoprocessorServiceResponse.Builder builder =
3508 ClientProtos.CoprocessorServiceResponse.newBuilder();
3509 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3510 HConstants.EMPTY_BYTE_ARRAY));
3511 builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3512 .setValue(execResult.toByteString()));
3513 return builder.build();
3514 } catch (IOException ie) {
3515 throw new ServiceException(ie);
3516 }
3517 }
3518
3519
3520
3521
3522
3523 public ReplicationSourceService getReplicationSourceService() {
3524 return replicationSourceHandler;
3525 }
3526
3527
3528
3529
3530
3531 public ReplicationSinkService getReplicationSinkService() {
3532 return replicationSinkHandler;
3533 }
3534
3535
3536
3537
3538
3539
3540
3541
3542 @Override
3543 public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
3544 throws ServiceException {
3545 try {
3546 checkOpen();
3547 } catch (IOException ie) {
3548 throw new ServiceException(ie);
3549 }
3550
3551
3552
3553 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
3554 CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
3555 if (controller != null) controller.setCellScanner(null);
3556
3557 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
3558
3559
3560 List<CellScannable> cellsToReturn = null;
3561 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
3562 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
3563 Boolean processed = null;
3564
3565 for (RegionAction regionAction : request.getRegionActionList()) {
3566 this.requestCount.add(regionAction.getActionCount());
3567 HRegion region;
3568 regionActionResultBuilder.clear();
3569 try {
3570 region = getRegion(regionAction.getRegion());
3571 } catch (IOException e) {
3572 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
3573 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
3574 continue;
3575 }
3576
3577 if (regionAction.hasAtomic() && regionAction.getAtomic()) {
3578
3579
3580 try {
3581 if (request.hasCondition()) {
3582 Condition condition = request.getCondition();
3583 byte[] row = condition.getRow().toByteArray();
3584 byte[] family = condition.getFamily().toByteArray();
3585 byte[] qualifier = condition.getQualifier().toByteArray();
3586 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
3587 ByteArrayComparable comparator =
3588 ProtobufUtil.toComparator(condition.getComparator());
3589 processed = checkAndRowMutate(region, regionAction.getActionList(),
3590 cellScanner, row, family, qualifier, compareOp, comparator);
3591 } else {
3592 ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
3593 cellScanner);
3594
3595 if (stats != null) {
3596 responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
3597 .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
3598 }
3599 processed = Boolean.TRUE;
3600 }
3601 } catch (IOException e) {
3602
3603 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
3604 }
3605 } else {
3606
3607 cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
3608 regionActionResultBuilder, cellsToReturn, nonceGroup);
3609 }
3610 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
3611 }
3612
3613 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
3614 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
3615 }
3616 if (processed != null) responseBuilder.setProcessed(processed);
3617 return responseBuilder.build();
3618 }
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
3632 final RegionAction actions, final CellScanner cellScanner,
3633 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
3634
3635
3636
3637
3638 List<ClientProtos.Action> mutations = null;
3639 for (ClientProtos.Action action: actions.getActionList()) {
3640 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
3641 try {
3642 Result r = null;
3643 if (action.hasGet()) {
3644 Get get = ProtobufUtil.toGet(action.getGet());
3645 r = region.get(get);
3646 } else if (action.hasServiceCall()) {
3647 resultOrExceptionBuilder = ResultOrException.newBuilder();
3648 try {
3649 Message result = execServiceOnRegion(region, action.getServiceCall());
3650 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
3651 ClientProtos.CoprocessorServiceResult.newBuilder();
3652 resultOrExceptionBuilder.setServiceResult(
3653 serviceResultBuilder.setValue(
3654 serviceResultBuilder.getValueBuilder()
3655 .setName(result.getClass().getName())
3656 .setValue(result.toByteString())));
3657 } catch (IOException ioe) {
3658 resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
3659 }
3660 } else if (action.hasMutation()) {
3661 MutationType type = action.getMutation().getMutateType();
3662 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
3663 !mutations.isEmpty()) {
3664
3665 doBatchOp(builder, region, mutations, cellScanner);
3666 mutations.clear();
3667 }
3668 switch (type) {
3669 case APPEND:
3670 r = append(region, action.getMutation(), cellScanner, nonceGroup);
3671 break;
3672 case INCREMENT:
3673 r = increment(region, action.getMutation(), cellScanner, nonceGroup);
3674 break;
3675 case PUT:
3676 case DELETE:
3677
3678 if (mutations == null) {
3679 mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
3680 }
3681 mutations.add(action);
3682 break;
3683 default:
3684 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3685 }
3686 } else {
3687 throw new HBaseIOException("Unexpected Action type");
3688 }
3689 if (r != null) {
3690 ClientProtos.Result pbResult = null;
3691 if (isClientCellBlockSupport()) {
3692 pbResult = ProtobufUtil.toResultNoData(r);
3693
3694 if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
3695 cellsToReturn.add(r);
3696 } else {
3697 pbResult = ProtobufUtil.toResult(r);
3698 }
3699 resultOrExceptionBuilder =
3700 ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
3701 }
3702
3703
3704
3705
3706 } catch (IOException ie) {
3707 resultOrExceptionBuilder = ResultOrException.newBuilder().
3708 setException(ResponseConverter.buildException(ie));
3709 }
3710 if (resultOrExceptionBuilder != null) {
3711
3712 resultOrExceptionBuilder.setIndex(action.getIndex());
3713 builder.addResultOrException(resultOrExceptionBuilder.build());
3714 }
3715 }
3716
3717 if (mutations != null && !mutations.isEmpty()) {
3718 doBatchOp(builder, region, mutations, cellScanner);
3719 }
3720 return cellsToReturn;
3721 }
3722
3723
3724
3725
3726 @Override
3727 @QosPriority(priority=HConstants.HIGH_QOS)
3728 public GetRegionInfoResponse getRegionInfo(final RpcController controller,
3729 final GetRegionInfoRequest request) throws ServiceException {
3730 try {
3731 checkOpen();
3732 requestCount.increment();
3733 HRegion region = getRegion(request.getRegion());
3734 HRegionInfo info = region.getRegionInfo();
3735 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
3736 builder.setRegionInfo(HRegionInfo.convert(info));
3737 if (request.hasCompactionState() && request.getCompactionState()) {
3738 builder.setCompactionState(region.getCompactionState());
3739 }
3740 builder.setIsRecovering(region.isRecovering());
3741 return builder.build();
3742 } catch (IOException ie) {
3743 throw new ServiceException(ie);
3744 }
3745 }
3746
3747 @Override
3748 public GetStoreFileResponse getStoreFile(final RpcController controller,
3749 final GetStoreFileRequest request) throws ServiceException {
3750 try {
3751 checkOpen();
3752 HRegion region = getRegion(request.getRegion());
3753 requestCount.increment();
3754 Set<byte[]> columnFamilies;
3755 if (request.getFamilyCount() == 0) {
3756 columnFamilies = region.getStores().keySet();
3757 } else {
3758 columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
3759 for (ByteString cf: request.getFamilyList()) {
3760 columnFamilies.add(cf.toByteArray());
3761 }
3762 }
3763 int nCF = columnFamilies.size();
3764 List<String> fileList = region.getStoreFileList(
3765 columnFamilies.toArray(new byte[nCF][]));
3766 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
3767 builder.addAllStoreFile(fileList);
3768 return builder.build();
3769 } catch (IOException ie) {
3770 throw new ServiceException(ie);
3771 }
3772 }
3773
3774 @Override
3775 @QosPriority(priority=HConstants.HIGH_QOS)
3776 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
3777 final GetOnlineRegionRequest request) throws ServiceException {
3778 try {
3779 checkOpen();
3780 requestCount.increment();
3781 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
3782 for (HRegion region: this.onlineRegions.values()) {
3783 list.add(region.getRegionInfo());
3784 }
3785 Collections.sort(list);
3786 return ResponseConverter.buildGetOnlineRegionResponse(list);
3787 } catch (IOException ie) {
3788 throw new ServiceException(ie);
3789 }
3790 }
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817 @Override
3818 @QosPriority(priority=HConstants.HIGH_QOS)
3819 public OpenRegionResponse openRegion(final RpcController controller,
3820 final OpenRegionRequest request) throws ServiceException {
3821 try {
3822 checkOpen();
3823 } catch (IOException ie) {
3824 throw new ServiceException(ie);
3825 }
3826 requestCount.increment();
3827 if (request.hasServerStartCode() && this.serverNameFromMasterPOV != null) {
3828
3829 long serverStartCode = request.getServerStartCode();
3830 if (this.serverNameFromMasterPOV.getStartcode() != serverStartCode) {
3831 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
3832 "different server with startCode: " + serverStartCode + ", this server is: "
3833 + this.serverNameFromMasterPOV));
3834 }
3835 }
3836 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
3837 final int regionCount = request.getOpenInfoCount();
3838 final Map<TableName, HTableDescriptor> htds =
3839 new HashMap<TableName, HTableDescriptor>(regionCount);
3840 final boolean isBulkAssign = regionCount > 1;
3841 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3842 final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
3843
3844 int versionOfOfflineNode = -1;
3845 if (regionOpenInfo.hasVersionOfOfflineNode()) {
3846 versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
3847 }
3848 HTableDescriptor htd;
3849 try {
3850 final HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
3851 if (onlineRegion != null) {
3852
3853 if (onlineRegion.getCoprocessorHost() != null) {
3854 onlineRegion.getCoprocessorHost().preOpen();
3855 }
3856
3857
3858 Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
3859 this.catalogTracker, region.getRegionName());
3860 if (this.getServerName().equals(p.getSecond())) {
3861 Boolean closing = regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
3862
3863
3864
3865
3866
3867 if (!Boolean.FALSE.equals(closing)
3868 && getFromOnlineRegions(region.getEncodedName()) != null) {
3869 LOG.warn("Attempted open of " + region.getEncodedName()
3870 + " but already online on this server");
3871 builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
3872 continue;
3873 }
3874 } else {
3875 LOG.warn("The region " + region.getEncodedName() + " is online on this server" +
3876 " but hbase:meta does not have this server - continue opening.");
3877 removeFromOnlineRegions(onlineRegion, null);
3878 }
3879 }
3880 LOG.info("Open " + region.getRegionNameAsString());
3881 htd = htds.get(region.getTable());
3882 if (htd == null) {
3883 htd = this.tableDescriptors.get(region.getTable());
3884 htds.put(region.getTable(), htd);
3885 }
3886
3887 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(
3888 region.getEncodedNameAsBytes(), Boolean.TRUE);
3889
3890 if (Boolean.FALSE.equals(previous)) {
3891
3892 OpenRegionHandler.
3893 tryTransitionFromOfflineToFailedOpen(this, region, versionOfOfflineNode);
3894
3895 throw new RegionAlreadyInTransitionException("Received OPEN for the region:" +
3896 region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
3897 }
3898
3899 if (Boolean.TRUE.equals(previous)) {
3900
3901 LOG.info("Receiving OPEN for the region:" +
3902 region.getRegionNameAsString() + " , which we are already trying to OPEN" +
3903 " - ignoring this new request for this region.");
3904 }
3905
3906
3907
3908 removeFromMovedRegions(region.getEncodedName());
3909
3910 if (previous == null) {
3911
3912 if (SplitLogManager.isRegionMarkedRecoveringInZK(this.zooKeeper,
3913 region.getEncodedName())) {
3914
3915
3916 if (!regionOpenInfo.hasOpenForDistributedLogReplay()
3917 || regionOpenInfo.getOpenForDistributedLogReplay()) {
3918 this.recoveringRegions.put(region.getEncodedName(), null);
3919 } else {
3920
3921
3922 List<String> tmpRegions = new ArrayList<String>();
3923 tmpRegions.add(region.getEncodedName());
3924 SplitLogManager.deleteRecoveringRegionZNodes(this.zooKeeper, tmpRegions);
3925 }
3926 }
3927
3928
3929 if (region.isMetaRegion()) {
3930 this.service.submit(new OpenMetaHandler(this, this, region, htd,
3931 versionOfOfflineNode));
3932 } else {
3933 updateRegionFavoredNodesMapping(region.getEncodedName(),
3934 regionOpenInfo.getFavoredNodesList());
3935 this.service.submit(new OpenRegionHandler(this, this, region, htd,
3936 versionOfOfflineNode));
3937 }
3938 }
3939
3940 builder.addOpeningState(RegionOpeningState.OPENED);
3941
3942 } catch (KeeperException zooKeeperEx) {
3943 LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
3944 throw new ServiceException(zooKeeperEx);
3945 } catch (IOException ie) {
3946 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
3947 if (isBulkAssign) {
3948 builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
3949 } else {
3950 throw new ServiceException(ie);
3951 }
3952 }
3953 }
3954
3955 return builder.build();
3956 }
3957
3958 @Override
3959 public void updateRegionFavoredNodesMapping(String encodedRegionName,
3960 List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3961 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3962
3963
3964 for (int i = 0; i < favoredNodes.size(); i++) {
3965 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3966 favoredNodes.get(i).getPort());
3967 }
3968 regionFavoredNodesMap.put(encodedRegionName, addr);
3969 }
3970
3971
3972
3973
3974
3975
3976
3977 @Override
3978 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3979 return regionFavoredNodesMap.get(encodedRegionName);
3980 }
3981
3982
3983
3984
3985
3986
3987
3988
3989 @Override
3990 @QosPriority(priority=HConstants.HIGH_QOS)
3991 public CloseRegionResponse closeRegion(final RpcController controller,
3992 final CloseRegionRequest request) throws ServiceException {
3993 int versionOfClosingNode = -1;
3994 if (request.hasVersionOfClosingNode()) {
3995 versionOfClosingNode = request.getVersionOfClosingNode();
3996 }
3997 boolean zk = request.getTransitionInZK();
3998 final ServerName sn = (request.hasDestinationServer() ?
3999 ProtobufUtil.toServerName(request.getDestinationServer()) : null);
4000
4001 try {
4002 checkOpen();
4003 if (request.hasServerStartCode() && this.serverNameFromMasterPOV != null) {
4004
4005 long serverStartCode = request.getServerStartCode();
4006 if (this.serverNameFromMasterPOV.getStartcode() != serverStartCode) {
4007 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
4008 "different server with startCode: " + serverStartCode + ", this server is: "
4009 + this.serverNameFromMasterPOV));
4010 }
4011 }
4012 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
4013
4014
4015 final HRegion region = this.getFromOnlineRegions(encodedRegionName);
4016 if ((region != null) && (region .getCoprocessorHost() != null)) {
4017 region.getCoprocessorHost().preClose(false);
4018 }
4019
4020 requestCount.increment();
4021 LOG.info("Close " + encodedRegionName + ", via zk=" + (zk ? "yes" : "no") +
4022 ", znode version=" + versionOfClosingNode + ", on " + sn);
4023
4024 boolean closed = closeRegion(encodedRegionName, false, zk, versionOfClosingNode, sn);
4025 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
4026 return builder.build();
4027 } catch (IOException ie) {
4028 throw new ServiceException(ie);
4029 }
4030 }
4031
4032
4033
4034
4035
4036
4037
4038
4039 @Override
4040 @QosPriority(priority=HConstants.HIGH_QOS)
4041 public FlushRegionResponse flushRegion(final RpcController controller,
4042 final FlushRegionRequest request) throws ServiceException {
4043 try {
4044 checkOpen();
4045 requestCount.increment();
4046 HRegion region = getRegion(request.getRegion());
4047 LOG.info("Flushing " + region.getRegionNameAsString());
4048 boolean shouldFlush = true;
4049 if (request.hasIfOlderThanTs()) {
4050 shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
4051 }
4052 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
4053 if (shouldFlush) {
4054 long startTime = EnvironmentEdgeManager.currentTimeMillis();
4055 HRegion.FlushResult flushResult = region.flushcache();
4056 if (flushResult.isFlushSucceeded()) {
4057 long endTime = EnvironmentEdgeManager.currentTimeMillis();
4058 metricsRegionServer.updateFlushTime(endTime - startTime);
4059 }
4060 boolean result = flushResult.isCompactionNeeded();
4061 if (result) {
4062 this.compactSplitThread.requestSystemCompaction(region,
4063 "Compaction through user triggered flush");
4064 }
4065 builder.setFlushed(result);
4066 }
4067 builder.setLastFlushTime(region.getLastFlushTime());
4068 return builder.build();
4069 } catch (DroppedSnapshotException ex) {
4070
4071
4072
4073
4074 abort("Replay of HLog required. Forcing server shutdown", ex);
4075 throw new ServiceException(ex);
4076 } catch (IOException ie) {
4077 throw new ServiceException(ie);
4078 }
4079 }
4080
4081
4082
4083
4084
4085
4086
4087
4088 @Override
4089 @QosPriority(priority=HConstants.HIGH_QOS)
4090 public SplitRegionResponse splitRegion(final RpcController controller,
4091 final SplitRegionRequest request) throws ServiceException {
4092 try {
4093 checkOpen();
4094 requestCount.increment();
4095 HRegion region = getRegion(request.getRegion());
4096 region.startRegionOperation(Operation.SPLIT_REGION);
4097 LOG.info("Splitting " + region.getRegionNameAsString());
4098 long startTime = EnvironmentEdgeManager.currentTimeMillis();
4099 HRegion.FlushResult flushResult = region.flushcache();
4100 if (flushResult.isFlushSucceeded()) {
4101 long endTime = EnvironmentEdgeManager.currentTimeMillis();
4102 metricsRegionServer.updateFlushTime(endTime - startTime);
4103 }
4104 byte[] splitPoint = null;
4105 if (request.hasSplitPoint()) {
4106 splitPoint = request.getSplitPoint().toByteArray();
4107 }
4108 region.forceSplit(splitPoint);
4109 compactSplitThread.requestSplit(region, region.checkSplit());
4110 return SplitRegionResponse.newBuilder().build();
4111 } catch (IOException ie) {
4112 throw new ServiceException(ie);
4113 }
4114 }
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124 @Override
4125 @QosPriority(priority = HConstants.HIGH_QOS)
4126 public MergeRegionsResponse mergeRegions(final RpcController controller,
4127 final MergeRegionsRequest request) throws ServiceException {
4128 try {
4129 checkOpen();
4130 requestCount.increment();
4131 HRegion regionA = getRegion(request.getRegionA());
4132 HRegion regionB = getRegion(request.getRegionB());
4133 boolean forcible = request.getForcible();
4134 regionA.startRegionOperation(Operation.MERGE_REGION);
4135 regionB.startRegionOperation(Operation.MERGE_REGION);
4136 LOG.info("Receiving merging request for " + regionA + ", " + regionB
4137 + ",forcible=" + forcible);
4138 long startTime = EnvironmentEdgeManager.currentTimeMillis();
4139 HRegion.FlushResult flushResult = regionA.flushcache();
4140 if (flushResult.isFlushSucceeded()) {
4141 long endTime = EnvironmentEdgeManager.currentTimeMillis();
4142 metricsRegionServer.updateFlushTime(endTime - startTime);
4143 }
4144 startTime = EnvironmentEdgeManager.currentTimeMillis();
4145 flushResult = regionB.flushcache();
4146 if (flushResult.isFlushSucceeded()) {
4147 long endTime = EnvironmentEdgeManager.currentTimeMillis();
4148 metricsRegionServer.updateFlushTime(endTime - startTime);
4149 }
4150 compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
4151 return MergeRegionsResponse.newBuilder().build();
4152 } catch (IOException ie) {
4153 throw new ServiceException(ie);
4154 }
4155 }
4156
4157
4158
4159
4160
4161
4162
4163
4164 @Override
4165 @QosPriority(priority=HConstants.HIGH_QOS)
4166 public CompactRegionResponse compactRegion(final RpcController controller,
4167 final CompactRegionRequest request) throws ServiceException {
4168 try {
4169 checkOpen();
4170 requestCount.increment();
4171 HRegion region = getRegion(request.getRegion());
4172 region.startRegionOperation(Operation.COMPACT_REGION);
4173 LOG.info("Compacting " + region.getRegionNameAsString());
4174 boolean major = false;
4175 byte [] family = null;
4176 Store store = null;
4177 if (request.hasFamily()) {
4178 family = request.getFamily().toByteArray();
4179 store = region.getStore(family);
4180 if (store == null) {
4181 throw new ServiceException(new IOException("column family " + Bytes.toString(family) +
4182 " does not exist in region " + region.getRegionNameAsString()));
4183 }
4184 }
4185 if (request.hasMajor()) {
4186 major = request.getMajor();
4187 }
4188 if (major) {
4189 if (family != null) {
4190 store.triggerMajorCompaction();
4191 } else {
4192 region.triggerMajorCompaction();
4193 }
4194 }
4195
4196 String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
4197 LOG.trace("User-triggered compaction requested for region " +
4198 region.getRegionNameAsString() + familyLogMsg);
4199 String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
4200 if(family != null) {
4201 compactSplitThread.requestCompaction(region, store, log,
4202 Store.PRIORITY_USER, null);
4203 } else {
4204 compactSplitThread.requestCompaction(region, log,
4205 Store.PRIORITY_USER, null);
4206 }
4207 return CompactRegionResponse.newBuilder().build();
4208 } catch (IOException ie) {
4209 throw new ServiceException(ie);
4210 }
4211 }
4212
4213
4214
4215
4216
4217
4218
4219
4220 @Override
4221 @QosPriority(priority=HConstants.REPLICATION_QOS)
4222 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
4223 final ReplicateWALEntryRequest request)
4224 throws ServiceException {
4225 try {
4226 if (replicationSinkHandler != null) {
4227 checkOpen();
4228 requestCount.increment();
4229 List<WALEntry> entries = request.getEntryList();
4230 CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
4231 rsHost.preReplicateLogEntries(entries, cellScanner);
4232 replicationSinkHandler.replicateLogEntries(entries, cellScanner);
4233 rsHost.postReplicateLogEntries(entries, cellScanner);
4234 }
4235 return ReplicateWALEntryResponse.newBuilder().build();
4236 } catch (IOException ie) {
4237 throw new ServiceException(ie);
4238 }
4239 }
4240
4241
4242
4243
4244
4245
4246
4247
4248
4249 @Override
4250 @QosPriority(priority = HConstants.REPLAY_QOS)
4251 public ReplicateWALEntryResponse replay(final RpcController controller,
4252 final ReplicateWALEntryRequest request) throws ServiceException {
4253 long before = EnvironmentEdgeManager.currentTimeMillis();
4254 CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
4255 try {
4256 checkOpen();
4257 List<WALEntry> entries = request.getEntryList();
4258 if (entries == null || entries.isEmpty()) {
4259
4260 return ReplicateWALEntryResponse.newBuilder().build();
4261 }
4262 HRegion region = this.getRegionByEncodedName(
4263 entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
4264 RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
4265 List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
4266 List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
4267
4268 boolean needAddReplayTag = (HFile.getFormatVersion(this.conf) >= 3);
4269 for (WALEntry entry : entries) {
4270 if (nonceManager != null) {
4271 long nonceGroup = entry.getKey().hasNonceGroup()
4272 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
4273 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
4274 nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
4275 }
4276 Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
4277 new Pair<HLogKey, WALEdit>();
4278 List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
4279 cells, walEntry, needAddReplayTag);
4280 if (coprocessorHost != null) {
4281
4282
4283 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
4284 walEntry.getSecond())) {
4285
4286 continue;
4287 }
4288 walEntries.add(walEntry);
4289 }
4290 mutations.addAll(edits);
4291 }
4292
4293 if (!mutations.isEmpty()) {
4294 OperationStatus[] result = doReplayBatchOp(region, mutations);
4295
4296 for (int i = 0; result != null && i < result.length; i++) {
4297 if (result[i] != OperationStatus.SUCCESS) {
4298 throw new IOException(result[i].getExceptionMsg());
4299 }
4300 }
4301 }
4302 if (coprocessorHost != null) {
4303 for (Pair<HLogKey, WALEdit> wal : walEntries) {
4304 coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
4305 wal.getSecond());
4306 }
4307 }
4308 return ReplicateWALEntryResponse.newBuilder().build();
4309 } catch (IOException ie) {
4310 throw new ServiceException(ie);
4311 } finally {
4312 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before);
4313 }
4314 }
4315
4316
4317
4318
4319
4320
4321
4322 @Override
4323 public RollWALWriterResponse rollWALWriter(final RpcController controller,
4324 final RollWALWriterRequest request) throws ServiceException {
4325 try {
4326 checkOpen();
4327 requestCount.increment();
4328 if (this.rsHost != null) {
4329 this.rsHost.preRollWALWriterRequest();
4330 }
4331 HLog wal = this.getWAL();
4332 byte[][] regionsToFlush = wal.rollWriter(true);
4333 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
4334 if (regionsToFlush != null) {
4335 for (byte[] region: regionsToFlush) {
4336 builder.addRegionToFlush(ByteStringer.wrap(region));
4337 }
4338 }
4339 return builder.build();
4340 } catch (IOException ie) {
4341 throw new ServiceException(ie);
4342 }
4343 }
4344
4345
4346
4347
4348
4349
4350
4351
4352 @Override
4353 public StopServerResponse stopServer(final RpcController controller,
4354 final StopServerRequest request) throws ServiceException {
4355 requestCount.increment();
4356 String reason = request.getReason();
4357 stop(reason);
4358 return StopServerResponse.newBuilder().build();
4359 }
4360
4361
4362
4363
4364
4365
4366
4367
4368 @Override
4369 public GetServerInfoResponse getServerInfo(final RpcController controller,
4370 final GetServerInfoRequest request) throws ServiceException {
4371 try {
4372 checkOpen();
4373 } catch (IOException ie) {
4374 throw new ServiceException(ie);
4375 }
4376 ServerName serverName = getServerName();
4377 requestCount.increment();
4378 return ResponseConverter.buildGetServerInfoResponse(serverName, rsInfo.getInfoPort());
4379 }
4380
4381
4382
4383
4384
4385
4386
4387
4388
4389
4390
4391 protected HRegion getRegion(
4392 final RegionSpecifier regionSpecifier) throws IOException {
4393 return getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
4394 ProtobufUtil.getRegionEncodedName(regionSpecifier));
4395 }
4396
4397
4398
4399
4400
4401
4402
4403
4404
4405
4406
4407 protected Result append(final HRegion region,
4408 final MutationProto m, final CellScanner cellScanner, long nonceGroup) throws IOException {
4409 long before = EnvironmentEdgeManager.currentTimeMillis();
4410 Append append = ProtobufUtil.toAppend(m, cellScanner);
4411 Result r = null;
4412 if (region.getCoprocessorHost() != null) {
4413 r = region.getCoprocessorHost().preAppend(append);
4414 }
4415 if (r == null) {
4416 long nonce = startNonceOperation(m, nonceGroup);
4417 boolean success = false;
4418 try {
4419 r = region.append(append, nonceGroup, nonce);
4420 success = true;
4421 } finally {
4422 endNonceOperation(m, nonceGroup, success);
4423 }
4424 if (region.getCoprocessorHost() != null) {
4425 region.getCoprocessorHost().postAppend(append, r);
4426 }
4427 }
4428 metricsRegionServer.updateAppend(EnvironmentEdgeManager.currentTimeMillis() - before);
4429 return r;
4430 }
4431
4432
4433
4434
4435
4436
4437
4438
4439
4440 protected Result increment(final HRegion region, final MutationProto mutation,
4441 final CellScanner cells, long nonceGroup) throws IOException {
4442 long before = EnvironmentEdgeManager.currentTimeMillis();
4443 Increment increment = ProtobufUtil.toIncrement(mutation, cells);
4444 Result r = null;
4445 if (region.getCoprocessorHost() != null) {
4446 r = region.getCoprocessorHost().preIncrement(increment);
4447 }
4448 if (r == null) {
4449 long nonce = startNonceOperation(mutation, nonceGroup);
4450 boolean success = false;
4451 try {
4452 r = region.increment(increment, nonceGroup, nonce);
4453 success = true;
4454 } finally {
4455 endNonceOperation(mutation, nonceGroup, success);
4456 }
4457 if (region.getCoprocessorHost() != null) {
4458 r = region.getCoprocessorHost().postIncrement(increment, r);
4459 }
4460 }
4461 metricsRegionServer.updateIncrement(EnvironmentEdgeManager.currentTimeMillis() - before);
4462 return r;
4463 }
4464
4465
4466
4467
4468
4469
4470
4471 private long startNonceOperation(final MutationProto mutation, long nonceGroup)
4472 throws IOException, OperationConflictException {
4473 if (nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
4474 boolean canProceed = false;
4475 try {
4476 canProceed = nonceManager.startOperation(nonceGroup, mutation.getNonce(), this);
4477 } catch (InterruptedException ex) {
4478 throw new InterruptedIOException("Nonce start operation interrupted");
4479 }
4480 if (!canProceed) {
4481
4482 String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
4483 + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
4484 + "] may have already completed";
4485 throw new OperationConflictException(message);
4486 }
4487 return mutation.getNonce();
4488 }
4489
4490
4491
4492
4493
4494
4495
4496 private void endNonceOperation(final MutationProto mutation, long nonceGroup,
4497 boolean success) {
4498 if (nonceManager == null || !mutation.hasNonce()) return;
4499 nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
4500 }
4501
4502 @Override
4503 public ServerNonceManager getNonceManager() {
4504 return this.nonceManager;
4505 }
4506
4507
4508
4509
4510
4511
4512
4513
4514 protected void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
4515 final List<ClientProtos.Action> mutations, final CellScanner cells) {
4516 Mutation[] mArray = new Mutation[mutations.size()];
4517 long before = EnvironmentEdgeManager.currentTimeMillis();
4518 boolean batchContainsPuts = false, batchContainsDelete = false;
4519 try {
4520 int i = 0;
4521 for (ClientProtos.Action action: mutations) {
4522 MutationProto m = action.getMutation();
4523 Mutation mutation;
4524 if (m.getMutateType() == MutationType.PUT) {
4525 mutation = ProtobufUtil.toPut(m, cells);
4526 batchContainsPuts = true;
4527 } else {
4528 mutation = ProtobufUtil.toDelete(m, cells);
4529 batchContainsDelete = true;
4530 }
4531 mArray[i++] = mutation;
4532 }
4533
4534 if (!region.getRegionInfo().isMetaTable()) {
4535 cacheFlusher.reclaimMemStoreMemory();
4536 }
4537
4538 OperationStatus codes[] = region.batchMutate(mArray);
4539 for (i = 0; i < codes.length; i++) {
4540 int index = mutations.get(i).getIndex();
4541 Exception e = null;
4542 switch (codes[i].getOperationStatusCode()) {
4543 case BAD_FAMILY:
4544 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
4545 builder.addResultOrException(getResultOrException(e, index));
4546 break;
4547
4548 case SANITY_CHECK_FAILURE:
4549 e = new FailedSanityCheckException(codes[i].getExceptionMsg());
4550 builder.addResultOrException(getResultOrException(e, index));
4551 break;
4552
4553 default:
4554 e = new DoNotRetryIOException(codes[i].getExceptionMsg());
4555 builder.addResultOrException(getResultOrException(e, index));
4556 break;
4557
4558 case SUCCESS:
4559 builder.addResultOrException(getResultOrException(
4560 ClientProtos.Result.getDefaultInstance(), index, region.getRegionStats()));
4561 break;
4562 }
4563 }
4564 } catch (IOException ie) {
4565 for (int i = 0; i < mutations.size(); i++) {
4566 builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
4567 }
4568 }
4569 long after = EnvironmentEdgeManager.currentTimeMillis();
4570 if (batchContainsPuts) {
4571 metricsRegionServer.updatePut(after - before);
4572 }
4573 if (batchContainsDelete) {
4574 metricsRegionServer.updateDelete(after - before);
4575 }
4576 }
4577
4578 private static ResultOrException getResultOrException(final ClientProtos.Result r,
4579 final int index, final ClientProtos.RegionLoadStats stats) {
4580 return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
4581 }
4582
4583 private static ResultOrException getResultOrException(final Exception e, final int index) {
4584 return getResultOrException(ResponseConverter.buildActionResult(e), index);
4585 }
4586
4587 private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
4588 final int index) {
4589 return builder.setIndex(index).build();
4590 }
4591
4592
4593
4594
4595
4596
4597
4598
4599
4600
4601 protected OperationStatus [] doReplayBatchOp(final HRegion region,
4602 final List<HLogSplitter.MutationReplay> mutations) throws IOException {
4603
4604 long before = EnvironmentEdgeManager.currentTimeMillis();
4605 boolean batchContainsPuts = false, batchContainsDelete = false;
4606 try {
4607 for (Iterator<HLogSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
4608 HLogSplitter.MutationReplay m = it.next();
4609 if (m.type == MutationType.PUT) {
4610 batchContainsPuts = true;
4611 } else {
4612 batchContainsDelete = true;
4613 }
4614 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
4615 List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
4616 if (metaCells != null && !metaCells.isEmpty()) {
4617 for (Cell metaCell : metaCells) {
4618 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
4619 if (compactionDesc != null) {
4620 region.completeCompactionMarker(compactionDesc);
4621 }
4622 }
4623 it.remove();
4624 }
4625 }
4626 requestCount.add(mutations.size());
4627 if (!region.getRegionInfo().isMetaTable()) {
4628 cacheFlusher.reclaimMemStoreMemory();
4629 }
4630 return region.batchReplay(mutations.toArray(
4631 new HLogSplitter.MutationReplay[mutations.size()]));
4632 } finally {
4633 long after = EnvironmentEdgeManager.currentTimeMillis();
4634 if (batchContainsPuts) {
4635 metricsRegionServer.updatePut(after - before);
4636 }
4637 if (batchContainsDelete) {
4638 metricsRegionServer.updateDelete(after - before);
4639 }
4640 }
4641 }
4642
4643
4644
4645
4646
4647
4648
4649
4650
4651 protected ClientProtos.RegionLoadStats mutateRows(final HRegion region,
4652 final List<ClientProtos.Action> actions, final CellScanner cellScanner)
4653 throws IOException {
4654 if (!region.getRegionInfo().isMetaTable()) {
4655 cacheFlusher.reclaimMemStoreMemory();
4656 }
4657 RowMutations rm = null;
4658 for (ClientProtos.Action action: actions) {
4659 if (action.hasGet()) {
4660 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
4661 action.getGet());
4662 }
4663 MutationType type = action.getMutation().getMutateType();
4664 if (rm == null) {
4665 rm = new RowMutations(action.getMutation().getRow().toByteArray());
4666 }
4667 switch (type) {
4668 case PUT:
4669 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
4670 break;
4671 case DELETE:
4672 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
4673 break;
4674 default:
4675 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
4676 }
4677 }
4678 region.mutateRow(rm);
4679 return region.getRegionStats();
4680 }
4681
4682
4683
4684
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695 private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
4696 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
4697 CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
4698 if (!region.getRegionInfo().isMetaTable()) {
4699 cacheFlusher.reclaimMemStoreMemory();
4700 }
4701 RowMutations rm = null;
4702 for (ClientProtos.Action action: actions) {
4703 if (action.hasGet()) {
4704 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
4705 action.getGet());
4706 }
4707 MutationType type = action.getMutation().getMutateType();
4708 if (rm == null) {
4709 rm = new RowMutations(action.getMutation().getRow().toByteArray());
4710 }
4711 switch (type) {
4712 case PUT:
4713 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
4714 break;
4715 case DELETE:
4716 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
4717 break;
4718 default:
4719 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
4720 }
4721 }
4722 return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
4723 }
4724
4725 private static class MovedRegionInfo {
4726 private final ServerName serverName;
4727 private final long seqNum;
4728 private final long ts;
4729
4730 public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
4731 this.serverName = serverName;
4732 this.seqNum = closeSeqNum;
4733 ts = EnvironmentEdgeManager.currentTimeMillis();
4734 }
4735
4736 public ServerName getServerName() {
4737 return serverName;
4738 }
4739
4740 public long getSeqNum() {
4741 return seqNum;
4742 }
4743
4744 public long getMoveTime() {
4745 return ts;
4746 }
4747 }
4748
4749
4750
4751 protected Map<String, MovedRegionInfo> movedRegions =
4752 new ConcurrentHashMap<String, MovedRegionInfo>(3000);
4753
4754
4755
4756 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
4757
4758 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
4759 if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
4760 LOG.warn("Not adding moved region record: " + encodedName + " to self.");
4761 return;
4762 }
4763 LOG.info("Adding moved region record: " + encodedName + " to "
4764 + destination.getServerName() + ":" + destination.getPort()
4765 + " as of " + closeSeqNum);
4766 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
4767 }
4768
4769 private void removeFromMovedRegions(String encodedName) {
4770 movedRegions.remove(encodedName);
4771 }
4772
4773 private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
4774 MovedRegionInfo dest = movedRegions.get(encodedRegionName);
4775
4776 long now = EnvironmentEdgeManager.currentTimeMillis();
4777 if (dest != null) {
4778 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
4779 return dest;
4780 } else {
4781 movedRegions.remove(encodedRegionName);
4782 }
4783 }
4784
4785 return null;
4786 }
4787
4788
4789
4790
4791 protected void cleanMovedRegions() {
4792 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
4793 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
4794
4795 while (it.hasNext()){
4796 Map.Entry<String, MovedRegionInfo> e = it.next();
4797 if (e.getValue().getMoveTime() < cutOff) {
4798 it.remove();
4799 }
4800 }
4801 }
4802
4803
4804
4805
4806 protected static class MovedRegionsCleaner extends Chore implements Stoppable {
4807 private HRegionServer regionServer;
4808 Stoppable stoppable;
4809
4810 private MovedRegionsCleaner(
4811 HRegionServer regionServer, Stoppable stoppable){
4812 super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
4813 this.regionServer = regionServer;
4814 this.stoppable = stoppable;
4815 }
4816
4817 static MovedRegionsCleaner createAndStart(HRegionServer rs){
4818 Stoppable stoppable = new Stoppable() {
4819 private volatile boolean isStopped = false;
4820 @Override public void stop(String why) { isStopped = true;}
4821 @Override public boolean isStopped() {return isStopped;}
4822 };
4823
4824 return new MovedRegionsCleaner(rs, stoppable);
4825 }
4826
4827 @Override
4828 protected void chore() {
4829 regionServer.cleanMovedRegions();
4830 }
4831
4832 @Override
4833 public void stop(String why) {
4834 stoppable.stop(why);
4835 }
4836
4837 @Override
4838 public boolean isStopped() {
4839 return stoppable.isStopped();
4840 }
4841 }
4842
4843 private String getMyEphemeralNodePath() {
4844 return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
4845 }
4846
4847
4848
4849
4850 private static class RegionScannerHolder {
4851 private RegionScanner s;
4852 private long nextCallSeq = 0L;
4853 private HRegion r;
4854
4855 public RegionScannerHolder(RegionScanner s, HRegion r) {
4856 this.s = s;
4857 this.r = r;
4858 }
4859 }
4860
4861 private boolean isHealthCheckerConfigured() {
4862 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
4863 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
4864 }
4865
4866
4867
4868
4869 public CompactSplitThread getCompactSplitThread() {
4870 return this.compactSplitThread;
4871 }
4872
4873
4874
4875
4876
4877
4878
4879
4880 private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
4881 IOException {
4882 if (!r.isRecovering()) {
4883
4884 return;
4885 }
4886
4887 HRegionInfo region = r.getRegionInfo();
4888 ZooKeeperWatcher zkw = getZooKeeper();
4889 String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
4890 Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay();
4891 long minSeqIdForLogReplay = -1;
4892 for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
4893 if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
4894 minSeqIdForLogReplay = storeSeqIdForReplay;
4895 }
4896 }
4897
4898 try {
4899 long lastRecordedFlushedSequenceId = -1;
4900 String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
4901 region.getEncodedName());
4902
4903 byte[] data = ZKUtil.getData(zkw, nodePath);
4904 if (data != null) {
4905 lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
4906 }
4907 if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
4908 ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
4909 }
4910 if (previousRSName != null) {
4911
4912 nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
4913 ZKUtil.setData(zkw, nodePath,
4914 ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
4915 LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
4916 + previousRSName);
4917 } else {
4918 LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName());
4919 }
4920 } catch (NoNodeException ignore) {
4921 LOG.debug("Region " + region.getEncodedName() +
4922 " must have completed recovery because its recovery znode has been removed", ignore);
4923 }
4924 }
4925
4926
4927
4928
4929
4930
4931 private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
4932 String result = null;
4933 long maxZxid = 0;
4934 ZooKeeperWatcher zkw = this.getZooKeeper();
4935 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
4936 List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
4937 if (failedServers == null || failedServers.isEmpty()) {
4938 return result;
4939 }
4940 for (String failedServer : failedServers) {
4941 String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
4942 Stat stat = new Stat();
4943 ZKUtil.getDataNoWatch(zkw, rsPath, stat);
4944 if (maxZxid < stat.getCzxid()) {
4945 maxZxid = stat.getCzxid();
4946 result = failedServer;
4947 }
4948 }
4949 return result;
4950 }
4951
4952 @Override
4953 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
4954 UpdateFavoredNodesRequest request) throws ServiceException {
4955 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
4956 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
4957 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
4958 HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
4959 updateRegionFavoredNodesMapping(hri.getEncodedName(),
4960 regionUpdateInfo.getFavoredNodesList());
4961 }
4962 respBuilder.setResponse(openInfoList.size());
4963 return respBuilder.build();
4964 }
4965
4966
4967
4968
4969 public CacheConfig getCacheConfig() {
4970 return this.cacheConfig;
4971 }
4972
4973 @Override
4974 public HeapMemoryManager getHeapMemoryManager() {
4975 return hMemManager;
4976 }
4977
4978 @Override
4979 public double getCompactionPressure() {
4980 double max = 0;
4981 for (HRegion region : onlineRegions.values()) {
4982 for (Store store : region.getStores().values()) {
4983 double normCount = store.getCompactionPressure();
4984 if (normCount > max) {
4985 max = normCount;
4986 }
4987 }
4988 }
4989 return max;
4990 }
4991 }