View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
22  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
23  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.ListIterator;
29  import java.util.Map;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.MetaMutationAnnotation;
38  import org.apache.hadoop.hbase.RegionTransition;
39  import org.apache.hadoop.hbase.Server;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.catalog.CatalogTracker;
42  import org.apache.hadoop.hbase.catalog.MetaEditor;
43  import org.apache.hadoop.hbase.catalog.MetaReader;
44  import org.apache.hadoop.hbase.client.Delete;
45  import org.apache.hadoop.hbase.client.HTable;
46  import org.apache.hadoop.hbase.client.Mutation;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.executor.EventType;
49  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
50  import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.ConfigUtil;
53  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54  import org.apache.hadoop.hbase.util.Pair;
55  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
56  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
57  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
58  import org.apache.zookeeper.KeeperException;
59  import org.apache.zookeeper.KeeperException.NodeExistsException;
60  import org.apache.zookeeper.data.Stat;
61  
62  /**
63   * Executes region merge as a "transaction". It is similar with
64   * SplitTransaction. Call {@link #prepare(RegionServerServices)} to setup the
65   * transaction, {@link #execute(Server, RegionServerServices)} to run the
66   * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if
67   * execute fails.
68   * 
69   * <p>
70   * Here is an example of how you would use this class:
71   * 
72   * <pre>
73   *  RegionMergeTransaction mt = new RegionMergeTransaction(this.conf, parent, midKey)
74   *  if (!mt.prepare(services)) return;
75   *  try {
76   *    mt.execute(server, services);
77   *  } catch (IOException ioe) {
78   *    try {
79   *      mt.rollback(server, services);
80   *      return;
81   *    } catch (RuntimeException e) {
82   *      myAbortable.abort("Failed merge, abort");
83   *    }
84   *  }
85   * </Pre>
86   * <p>
87   * This class is not thread safe. Caller needs ensure merge is run by one thread
88   * only.
89   */
90  @InterfaceAudience.Private
91  public class RegionMergeTransaction {
92    private static final Log LOG = LogFactory.getLog(RegionMergeTransaction.class);
93  
94    // Merged region info
95    private HRegionInfo mergedRegionInfo;
96    // region_a sorts before region_b
97    private final HRegion region_a;
98    private final HRegion region_b;
99    // merges dir is under region_a
100   private final Path mergesdir;
101   private int znodeVersion = -1;
102   // We only merge adjacent regions if forcible is false
103   private final boolean forcible;
104   private boolean useZKForAssignment;
105 
106   /**
107    * Types to add to the transaction journal. Each enum is a step in the merge
108    * transaction. Used to figure how much we need to rollback.
109    */
110   enum JournalEntry {
111     /**
112      * Set region as in transition, set it into MERGING state.
113      */
114     SET_MERGING_IN_ZK,
115     /**
116      * We created the temporary merge data directory.
117      */
118     CREATED_MERGE_DIR,
119     /**
120      * Closed the merging region A.
121      */
122     CLOSED_REGION_A,
123     /**
124      * The merging region A has been taken out of the server's online regions list.
125      */
126     OFFLINED_REGION_A,
127     /**
128      * Closed the merging region B.
129      */
130     CLOSED_REGION_B,
131     /**
132      * The merging region B has been taken out of the server's online regions list.
133      */
134     OFFLINED_REGION_B,
135     /**
136      * Started in on creation of the merged region.
137      */
138     STARTED_MERGED_REGION_CREATION,
139     /**
140      * Point of no return. If we got here, then transaction is not recoverable
141      * other than by crashing out the regionserver.
142      */
143     PONR
144   }
145 
146   /*
147    * Journal of how far the merge transaction has progressed.
148    */
149   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
150 
151   private static IOException closedByOtherException = new IOException(
152       "Failed to close region: already closed by another thread");
153 
154   private RegionServerCoprocessorHost rsCoprocessorHost = null;
155 
156   /**
157    * Constructor
158    * @param a region a to merge
159    * @param b region b to merge
160    * @param forcible if false, we will only merge adjacent regions
161    */
162   public RegionMergeTransaction(final HRegion a, final HRegion b,
163       final boolean forcible) {
164     if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
165       this.region_a = a;
166       this.region_b = b;
167     } else {
168       this.region_a = b;
169       this.region_b = a;
170     }
171     this.forcible = forcible;
172     this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
173   }
174 
175   /**
176    * Does checks on merge inputs.
177    * @param services
178    * @return <code>true</code> if the regions are mergeable else
179    *         <code>false</code> if they are not (e.g. its already closed, etc.).
180    */
181   public boolean prepare(final RegionServerServices services) {
182     if (!region_a.getTableDesc().getTableName()
183         .equals(region_b.getTableDesc().getTableName())) {
184       LOG.info("Can't merge regions " + region_a + "," + region_b
185           + " because they do not belong to the same table");
186       return false;
187     }
188     if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
189       LOG.info("Can't merge the same region " + region_a);
190       return false;
191     }
192     if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
193             region_b.getRegionInfo())) {
194       String msg = "Skip merging " + this.region_a.getRegionNameAsString()
195           + " and " + this.region_b.getRegionNameAsString()
196           + ", because they are not adjacent.";
197       LOG.info(msg);
198       return false;
199     }
200     if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
201       return false;
202     }
203     try {
204       boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
205           region_a.getRegionName());
206       if (regionAHasMergeQualifier ||
207           hasMergeQualifierInMeta(services, region_b.getRegionName())) {
208         LOG.debug("Region " + (regionAHasMergeQualifier ? region_a.getRegionNameAsString()
209                 : region_b.getRegionNameAsString())
210             + " is not mergeable because it has merge qualifier in META");
211         return false;
212       }
213     } catch (IOException e) {
214       LOG.warn("Failed judging whether merge transaction is available for "
215               + region_a.getRegionNameAsString() + " and "
216               + region_b.getRegionNameAsString(), e);
217       return false;
218     }
219 
220     // WARN: make sure there is no parent region of the two merging regions in
221     // hbase:meta If exists, fixing up daughters would cause daughter regions(we
222     // have merged one) online again when we restart master, so we should clear
223     // the parent region to prevent the above case
224     // Since HBASE-7721, we don't need fix up daughters any more. so here do
225     // nothing
226 
227     this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
228         region_b.getRegionInfo());
229     return true;
230   }
231 
232   /**
233    * Run the transaction.
234    * @param server Hosting server instance. Can be null when testing (won't try
235    *          and update in zk if a null server)
236    * @param services Used to online/offline regions.
237    * @throws IOException If thrown, transaction failed. Call
238    *           {@link #rollback(Server, RegionServerServices)}
239    * @return merged region
240    * @throws IOException
241    * @see #rollback(Server, RegionServerServices)
242    */
243   public HRegion execute(final Server server,
244       final RegionServerServices services) throws IOException {
245     useZKForAssignment = server == null ? true :
246       ConfigUtil.useZKForAssignment(server.getConfiguration());
247     if (rsCoprocessorHost == null) {
248       rsCoprocessorHost = server != null ? ((HRegionServer) server).getCoprocessorHost() : null;
249     }
250     HRegion mergedRegion = createMergedRegion(server, services);
251     if (rsCoprocessorHost != null) {
252       rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
253     }
254     return stepsAfterPONR(server, services, mergedRegion);
255   }
256 
257   public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
258       HRegion mergedRegion) throws IOException {
259     openMergedRegion(server, services, mergedRegion);
260     transitionZKNode(server, services, mergedRegion);
261     return mergedRegion;
262   }
263 
264   /**
265    * Prepare the merged region and region files.
266    * @param server Hosting server instance. Can be null when testing (won't try
267    *          and update in zk if a null server)
268    * @param services Used to online/offline regions.
269    * @return merged region
270    * @throws IOException If thrown, transaction failed. Call
271    *           {@link #rollback(Server, RegionServerServices)}
272    */
273   HRegion createMergedRegion(final Server server,
274       final RegionServerServices services) throws IOException {
275     LOG.info("Starting merge of " + region_a + " and "
276         + region_b.getRegionNameAsString() + ", forcible=" + forcible);
277     if ((server != null && server.isStopped())
278         || (services != null && services.isStopping())) {
279       throw new IOException("Server is stopped or stopping");
280     }
281 
282     if (rsCoprocessorHost != null) {
283       if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
284         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
285             + this.region_b + " merge.");
286       }
287     }
288 
289     // If true, no cluster to write meta edits to or to update znodes in.
290     boolean testing = server == null ? true : server.getConfiguration()
291         .getBoolean("hbase.testing.nocluster", false);
292 
293     HRegion mergedRegion = stepsBeforePONR(server, services, testing);
294 
295     @MetaMutationAnnotation
296     List<Mutation> metaEntries = new ArrayList<Mutation>();
297     if (rsCoprocessorHost != null) {
298       if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
299         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
300             + this.region_b + " merge.");
301       }
302       try {
303         for (Mutation p : metaEntries) {
304           HRegionInfo.parseRegionName(p.getRow());
305         }
306       } catch (IOException e) {
307         LOG.error("Row key of mutation from coprocessor is not parsable as region name."
308             + "Mutations from coprocessor should only be for hbase:meta table.", e);
309         throw e;
310       }
311     }
312 
313     // This is the point of no return. Similar with SplitTransaction.
314     // IF we reach the PONR then subsequent failures need to crash out this
315     // regionserver
316     this.journal.add(JournalEntry.PONR);
317 
318     // Add merged region and delete region_a and region_b
319     // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
320     // will determine whether the region is merged or not in case of failures.
321     // If it is successful, master will roll-forward, if not, master will
322     // rollback
323     if (!testing && useZKForAssignment) {
324       if (metaEntries.isEmpty()) {
325         MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a
326             .getRegionInfo(), region_b.getRegionInfo(), server.getServerName());
327       } else {
328         mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(),
329           region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries);
330       }
331     } else if (services != null && !useZKForAssignment) {
332       if (!services.reportRegionStateTransition(TransitionCode.MERGE_PONR,
333           mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
334         // Passed PONR, let SSH clean it up
335         throw new IOException("Failed to notify master that merge passed PONR: "
336           + region_a.getRegionInfo().getRegionNameAsString() + " and "
337           + region_b.getRegionInfo().getRegionNameAsString());
338       }
339     }
340     return mergedRegion;
341   }
342 
343   private void mergeRegionsAndPutMetaEntries(CatalogTracker catalogTracker,
344       HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName serverName,
345       List<Mutation> metaEntries) throws IOException {
346     prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries);
347     MetaEditor.mutateMetaTable(catalogTracker, metaEntries);
348   }
349 
350   public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
351       HRegionInfo regionB, ServerName serverName, List<Mutation> mutations) throws IOException {
352     HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
353 
354     // Put for parent
355     Put putOfMerged = MetaEditor.makePutFromRegionInfo(copyOfMerged);
356     putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray());
357     putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray());
358     mutations.add(putOfMerged);
359     // Deletes for merging regions
360     Delete deleteA = MetaEditor.makeDeleteFromRegionInfo(regionA);
361     Delete deleteB = MetaEditor.makeDeleteFromRegionInfo(regionB);
362     mutations.add(deleteA);
363     mutations.add(deleteB);
364     // The merged is a new region, openSeqNum = 1 is fine.
365     addLocation(putOfMerged, serverName, 1);
366   }
367 
368   @SuppressWarnings("deprecation")
369   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
370     p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
371         .toBytes(sn.getHostAndPort()));
372     p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
373         .getStartcode()));
374     p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
375     return p;
376   }
377 
378   public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
379       boolean testing) throws IOException {
380     // Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
381     // have zookeeper so don't do zk stuff if server or zookeeper is null
382     if (useZKAndZKIsSet(server)) {
383       try {
384         createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
385           server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
386       } catch (KeeperException e) {
387         throw new IOException("Failed creating PENDING_MERGE znode on "
388             + this.mergedRegionInfo.getRegionNameAsString(), e);
389       }
390     } else if (services != null && !useZKForAssignment) {
391       if (!services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE,
392           mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
393         throw new IOException("Failed to get ok from master to merge "
394           + region_a.getRegionInfo().getRegionNameAsString() + " and "
395           + region_b.getRegionInfo().getRegionNameAsString());
396       }
397     }
398     this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
399     if (useZKAndZKIsSet(server)) {
400       // After creating the merge node, wait for master to transition it
401       // from PENDING_MERGE to MERGING so that we can move on. We want master
402       // knows about it and won't transition any region which is merging.
403       znodeVersion = getZKNode(server, services);
404     }
405 
406     this.region_a.getRegionFileSystem().createMergesDir();
407     this.journal.add(JournalEntry.CREATED_MERGE_DIR);
408 
409     Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
410         services, this.region_a, true, testing);
411     Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
412         services, this.region_b, false, testing);
413 
414     assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
415 
416 
417     //
418     // mergeStoreFiles creates merged region dirs under the region_a merges dir
419     // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
420     // clean this up.
421     mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
422 
423     if (server != null && useZKAndZKIsSet(server)) {
424       try {
425         // Do one more check on the merging znode (before it is too late) in case
426         // any merging region is moved somehow. If so, the znode transition will fail.
427         this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
428           this.mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
429           server.getServerName(), this.znodeVersion,
430           RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGING);
431       } catch (KeeperException e) {
432         throw new IOException("Failed setting MERGING znode on "
433             + this.mergedRegionInfo.getRegionNameAsString(), e);
434       }
435     }
436 
437     // Log to the journal that we are creating merged region. We could fail
438     // halfway through. If we do, we could have left
439     // stuff in fs that needs cleanup -- a storefile or two. Thats why we
440     // add entry to journal BEFORE rather than AFTER the change.
441     this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION);
442     HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
443         this.region_b, this.mergedRegionInfo);
444     return mergedRegion;
445   }
446 
447   /**
448    * Create a merged region from the merges directory under region a. In order
449    * to mock it for tests, place it with a new method.
450    * @param a hri of region a
451    * @param b hri of region b
452    * @param mergedRegion hri of merged region
453    * @return merged HRegion.
454    * @throws IOException
455    */
456   HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
457       final HRegionInfo mergedRegion) throws IOException {
458     return a.createMergedRegionFromMerges(mergedRegion, b);
459   }
460 
461   /**
462    * Close the merging region and offline it in regionserver
463    * @param services
464    * @param region
465    * @param isRegionA true if it is merging region a, false if it is region b
466    * @param testing true if it is testing
467    * @return a map of family name to list of store files
468    * @throws IOException
469    */
470   private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
471       final RegionServerServices services, final HRegion region,
472       final boolean isRegionA, final boolean testing) throws IOException {
473     Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
474     Exception exceptionToThrow = null;
475     try {
476       hstoreFilesToMerge = region.close(false);
477     } catch (Exception e) {
478       exceptionToThrow = e;
479     }
480     if (exceptionToThrow == null && hstoreFilesToMerge == null) {
481       // The region was closed by a concurrent thread. We can't continue
482       // with the merge, instead we must just abandon the merge. If we
483       // reopen or merge this could cause problems because the region has
484       // probably already been moved to a different server, or is in the
485       // process of moving to a different server.
486       exceptionToThrow = closedByOtherException;
487     }
488     if (exceptionToThrow != closedByOtherException) {
489       this.journal.add(isRegionA ? JournalEntry.CLOSED_REGION_A
490           : JournalEntry.CLOSED_REGION_B);
491     }
492     if (exceptionToThrow != null) {
493       if (exceptionToThrow instanceof IOException)
494         throw (IOException) exceptionToThrow;
495       throw new IOException(exceptionToThrow);
496     }
497 
498     if (!testing) {
499       services.removeFromOnlineRegions(region, null);
500     }
501     this.journal.add(isRegionA ? JournalEntry.OFFLINED_REGION_A
502         : JournalEntry.OFFLINED_REGION_B);
503     return hstoreFilesToMerge;
504   }
505 
506   /**
507    * Get merged region info through the specified two regions
508    * @param a merging region A
509    * @param b merging region B
510    * @return the merged region info
511    */
512   public static HRegionInfo getMergedRegionInfo(final HRegionInfo a,
513       final HRegionInfo b) {
514     long rid = EnvironmentEdgeManager.currentTimeMillis();
515     // Regionid is timestamp. Merged region's id can't be less than that of
516     // merging regions else will insert at wrong location in hbase:meta
517     if (rid < a.getRegionId() || rid < b.getRegionId()) {
518       LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
519           + " and " + b.getRegionId() + ", but current time here is " + rid);
520       rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
521     }
522 
523     byte[] startKey = null;
524     byte[] endKey = null;
525     // Choose the smaller as start key
526     if (a.compareTo(b) <= 0) {
527       startKey = a.getStartKey();
528     } else {
529       startKey = b.getStartKey();
530     }
531     // Choose the bigger as end key
532     if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
533         || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
534             && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) {
535       endKey = a.getEndKey();
536     } else {
537       endKey = b.getEndKey();
538     }
539 
540     // Merged region is sorted between two merging regions in META
541     HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey,
542         endKey, false, rid);
543     return mergedRegionInfo;
544   }
545 
546   /**
547    * Perform time consuming opening of the merged region.
548    * @param server Hosting server instance. Can be null when testing (won't try
549    *          and update in zk if a null server)
550    * @param services Used to online/offline regions.
551    * @param merged the merged region
552    * @throws IOException If thrown, transaction failed. Call
553    *           {@link #rollback(Server, RegionServerServices)}
554    */
555   void openMergedRegion(final Server server,
556       final RegionServerServices services, HRegion merged) throws IOException {
557     boolean stopped = server != null && server.isStopped();
558     boolean stopping = services != null && services.isStopping();
559     if (stopped || stopping) {
560       LOG.info("Not opening merged region  " + merged.getRegionNameAsString()
561           + " because stopping=" + stopping + ", stopped=" + stopped);
562       return;
563     }
564     HRegionInfo hri = merged.getRegionInfo();
565     LoggingProgressable reporter = server == null ? null
566         : new LoggingProgressable(hri, server.getConfiguration().getLong(
567             "hbase.regionserver.regionmerge.open.log.interval", 10000));
568     merged.openHRegion(reporter);
569 
570     if (services != null) {
571       try {
572         if (useZKForAssignment) {
573           services.postOpenDeployTasks(merged, server.getCatalogTracker());
574         } else if (!services.reportRegionStateTransition(TransitionCode.MERGED,
575             mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
576           throw new IOException("Failed to report merged region to master: "
577             + mergedRegionInfo.getShortNameToLog());
578         }
579         services.addToOnlineRegions(merged);
580       } catch (KeeperException ke) {
581         throw new IOException(ke);
582       }
583     }
584 
585   }
586 
587   /**
588    * Finish off merge transaction, transition the zknode
589    * @param server Hosting server instance. Can be null when testing (won't try
590    *          and update in zk if a null server)
591    * @param services Used to online/offline regions.
592    * @throws IOException If thrown, transaction failed. Call
593    *           {@link #rollback(Server, RegionServerServices)}
594    */
595   void transitionZKNode(final Server server, final RegionServerServices services,
596       HRegion mergedRegion) throws IOException {
597     if (useZKAndZKIsSet(server)) {
598       // Tell master about merge by updating zk. If we fail, abort.
599       try {
600         this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
601           this.mergedRegionInfo, region_a.getRegionInfo(),
602           region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
603           RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
604   
605         long startTime = EnvironmentEdgeManager.currentTimeMillis();
606         int spins = 0;
607         // Now wait for the master to process the merge. We know it's done
608         // when the znode is deleted. The reason we keep tickling the znode is
609         // that it's possible for the master to miss an event.
610         do {
611           if (spins % 10 == 0) {
612             LOG.debug("Still waiting on the master to process the merge for "
613                 + this.mergedRegionInfo.getEncodedName() + ", waited "
614                 + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
615           }
616           Thread.sleep(100);
617           // When this returns -1 it means the znode doesn't exist
618           this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
619             this.mergedRegionInfo, region_a.getRegionInfo(),
620             region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
621             RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
622           spins++;
623         } while (this.znodeVersion != -1 && !server.isStopped()
624             && !services.isStopping());
625       } catch (Exception e) {
626         if (e instanceof InterruptedException) {
627           Thread.currentThread().interrupt();
628         }
629         throw new IOException("Failed telling master about merge "
630             + mergedRegionInfo.getEncodedName(), e);
631       }
632     }
633 
634     if (rsCoprocessorHost != null) {
635       rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
636     }
637 
638     // Leaving here, the mergedir with its dross will be in place but since the
639     // merge was successful, just leave it; it'll be cleaned when region_a is
640     // cleaned up by CatalogJanitor on master
641   }
642 
643   /**
644    * Wait for the merging node to be transitioned from pending_merge
645    * to merging by master. That's how we are sure master has processed
646    * the event and is good with us to move on. If we don't get any update,
647    * we periodically transition the node so that master gets the callback.
648    * If the node is removed or is not in pending_merge state any more,
649    * we abort the merge.
650    */
651   private int getZKNode(final Server server,
652       final RegionServerServices services) throws IOException {
653     // Wait for the master to process the pending_merge.
654     try {
655       int spins = 0;
656       Stat stat = new Stat();
657       ZooKeeperWatcher zkw = server.getZooKeeper();
658       ServerName expectedServer = server.getServerName();
659       String node = mergedRegionInfo.getEncodedName();
660       while (!(server.isStopped() || services.isStopping())) {
661         if (spins % 5 == 0) {
662           LOG.debug("Still waiting for master to process "
663             + "the pending_merge for " + node);
664           transitionMergingNode(zkw, mergedRegionInfo, region_a.getRegionInfo(),
665             region_b.getRegionInfo(), expectedServer, -1, RS_ZK_REQUEST_REGION_MERGE,
666             RS_ZK_REQUEST_REGION_MERGE);
667         }
668         Thread.sleep(100);
669         spins++;
670         byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
671         if (data == null) {
672           throw new IOException("Data is null, merging node "
673             + node + " no longer exists");
674         }
675         RegionTransition rt = RegionTransition.parseFrom(data);
676         EventType et = rt.getEventType();
677         if (et == RS_ZK_REGION_MERGING) {
678           ServerName serverName = rt.getServerName();
679           if (!serverName.equals(expectedServer)) {
680             throw new IOException("Merging node " + node + " is for "
681               + serverName + ", not us " + expectedServer);
682           }
683           byte [] payloadOfMerging = rt.getPayload();
684           List<HRegionInfo> mergingRegions = HRegionInfo.parseDelimitedFrom(
685             payloadOfMerging, 0, payloadOfMerging.length);
686           assert mergingRegions.size() == 3;
687           HRegionInfo a = mergingRegions.get(1);
688           HRegionInfo b = mergingRegions.get(2);
689           HRegionInfo hri_a = region_a.getRegionInfo();
690           HRegionInfo hri_b = region_b.getRegionInfo();
691           if (!(hri_a.equals(a) && hri_b.equals(b))) {
692             throw new IOException("Merging node " + node + " is for " + a + ", "
693               + b + ", not expected regions: " + hri_a + ", " + hri_b);
694           }
695           // Master has processed it.
696           return stat.getVersion();
697         }
698         if (et != RS_ZK_REQUEST_REGION_MERGE) {
699           throw new IOException("Merging node " + node
700             + " moved out of merging to " + et);
701         }
702       }
703       // Server is stopping/stopped
704       throw new IOException("Server is "
705         + (services.isStopping() ? "stopping" : "stopped"));
706     } catch (Exception e) {
707       if (e instanceof InterruptedException) {
708         Thread.currentThread().interrupt();
709       }
710       throw new IOException("Failed getting MERGING znode on "
711         + mergedRegionInfo.getRegionNameAsString(), e);
712     }
713   }
714 
715   /**
716    * Create reference file(s) of merging regions under the region_a merges dir
717    * @param hstoreFilesOfRegionA
718    * @param hstoreFilesOfRegionB
719    * @throws IOException
720    */
721   private void mergeStoreFiles(
722       Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
723       Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
724       throws IOException {
725     // Create reference file(s) of region A in mergdir
726     HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
727     for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
728         .entrySet()) {
729       String familyName = Bytes.toString(entry.getKey());
730       for (StoreFile storeFile : entry.getValue()) {
731         fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
732             this.mergesdir);
733       }
734     }
735     // Create reference file(s) of region B in mergedir
736     HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
737     for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
738         .entrySet()) {
739       String familyName = Bytes.toString(entry.getKey());
740       for (StoreFile storeFile : entry.getValue()) {
741         fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
742             this.mergesdir);
743       }
744     }
745   }
746 
747   /**
748    * @param server Hosting server instance (May be null when testing).
749    * @param services Services of regionserver, used to online regions.
750    * @throws IOException If thrown, rollback failed. Take drastic action.
751    * @return True if we successfully rolled back, false if we got to the point
752    *         of no return and so now need to abort the server to minimize
753    *         damage.
754    */
755   @SuppressWarnings("deprecation")
756   public boolean rollback(final Server server,
757       final RegionServerServices services) throws IOException {
758     assert this.mergedRegionInfo != null;
759     // Coprocessor callback
760     if (rsCoprocessorHost != null) {
761       rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b);
762     }
763 
764     boolean result = true;
765     ListIterator<JournalEntry> iterator = this.journal
766         .listIterator(this.journal.size());
767     // Iterate in reverse.
768     while (iterator.hasPrevious()) {
769       JournalEntry je = iterator.previous();
770       switch (je) {
771 
772         case SET_MERGING_IN_ZK:
773           if (useZKAndZKIsSet(server)) {
774             cleanZK(server, this.mergedRegionInfo);
775           } else if (services != null && !useZKForAssignment
776               && !services.reportRegionStateTransition(TransitionCode.MERGE_REVERTED,
777                   mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
778             return false;
779           }
780           break;
781 
782         case CREATED_MERGE_DIR:
783           this.region_a.writestate.writesEnabled = true;
784           this.region_b.writestate.writesEnabled = true;
785           this.region_a.getRegionFileSystem().cleanupMergesDir();
786           break;
787 
788         case CLOSED_REGION_A:
789           try {
790             // So, this returns a seqid but if we just closed and then reopened,
791             // we should be ok. On close, we flushed using sequenceid obtained
792             // from hosting regionserver so no need to propagate the sequenceid
793             // returned out of initialize below up into regionserver as we
794             // normally do.
795             this.region_a.initialize();
796           } catch (IOException e) {
797             LOG.error("Failed rollbacking CLOSED_REGION_A of region "
798                 + this.region_a.getRegionNameAsString(), e);
799             throw new RuntimeException(e);
800           }
801           break;
802 
803         case OFFLINED_REGION_A:
804           if (services != null)
805             services.addToOnlineRegions(this.region_a);
806           break;
807 
808         case CLOSED_REGION_B:
809           try {
810             this.region_b.initialize();
811           } catch (IOException e) {
812             LOG.error("Failed rollbacking CLOSED_REGION_A of region "
813                 + this.region_b.getRegionNameAsString(), e);
814             throw new RuntimeException(e);
815           }
816           break;
817 
818         case OFFLINED_REGION_B:
819           if (services != null)
820             services.addToOnlineRegions(this.region_b);
821           break;
822 
823         case STARTED_MERGED_REGION_CREATION:
824           this.region_a.getRegionFileSystem().cleanupMergedRegion(
825               this.mergedRegionInfo);
826           break;
827 
828         case PONR:
829           // We got to the point-of-no-return so we need to just abort. Return
830           // immediately. Do not clean up created merged regions.
831           return false;
832 
833         default:
834           throw new RuntimeException("Unhandled journal entry: " + je);
835       }
836     }
837     // Coprocessor callback
838     if (rsCoprocessorHost != null) {
839       rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b);
840     }
841 
842     return result;
843   }
844 
845   HRegionInfo getMergedRegionInfo() {
846     return this.mergedRegionInfo;
847   }
848 
849   // For unit testing.
850   Path getMergesDir() {
851     return this.mergesdir;
852   }
853 
854   private boolean useZKAndZKIsSet(final Server server) {
855     return server != null && useZKForAssignment && server.getZooKeeper() != null;
856   }
857 
858   private static void cleanZK(final Server server, final HRegionInfo hri) {
859     try {
860       // Only delete if its in expected state; could have been hijacked.
861       if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
862           RS_ZK_REQUEST_REGION_MERGE, server.getServerName())) {
863         ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
864           RS_ZK_REGION_MERGING, server.getServerName());
865       }
866     } catch (KeeperException.NoNodeException e) {
867       LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
868     } catch (KeeperException e) {
869       server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e);
870     }
871   }
872 
873   /**
874    * Creates a new ephemeral node in the PENDING_MERGE state for the merged region.
875    * Create it ephemeral in case regionserver dies mid-merge.
876    *
877    * <p>
878    * Does not transition nodes from other states. If a node already exists for
879    * this region, a {@link NodeExistsException} will be thrown.
880    *
881    * @param zkw zk reference
882    * @param region region to be created as offline
883    * @param serverName server event originates from
884    * @throws KeeperException
885    * @throws IOException
886    */
887   public static void createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
888       final ServerName serverName, final HRegionInfo a,
889       final HRegionInfo b) throws KeeperException, IOException {
890     LOG.debug(zkw.prefix("Creating ephemeral node for "
891       + region.getEncodedName() + " in PENDING_MERGE state"));
892     byte [] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
893     RegionTransition rt = RegionTransition.createRegionTransition(
894       RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), serverName, payload);
895     String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
896     if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
897       throw new IOException("Failed create of ephemeral " + node);
898     }
899   }
900 
901   /**
902    * Transitions an existing ephemeral node for the specified region which is
903    * currently in the begin state to be in the end state. Master cleans up the
904    * final MERGE znode when it reads it (or if we crash, zk will clean it up).
905    *
906    * <p>
907    * Does not transition nodes from other states. If for some reason the node
908    * could not be transitioned, the method returns -1. If the transition is
909    * successful, the version of the node after transition is returned.
910    *
911    * <p>
912    * This method can fail and return false for three different reasons:
913    * <ul>
914    * <li>Node for this region does not exist</li>
915    * <li>Node for this region is not in the begin state</li>
916    * <li>After verifying the begin state, update fails because of wrong version
917    * (this should never actually happen since an RS only does this transition
918    * following a transition to the begin state. If two RS are conflicting, one would
919    * fail the original transition to the begin state and not this transition)</li>
920    * </ul>
921    *
922    * <p>
923    * Does not set any watches.
924    *
925    * <p>
926    * This method should only be used by a RegionServer when merging two regions.
927    *
928    * @param zkw zk reference
929    * @param merged region to be transitioned to opened
930    * @param a merging region A
931    * @param b merging region B
932    * @param serverName server event originates from
933    * @param znodeVersion expected version of data before modification
934    * @param beginState the expected current state the znode should be
935    * @param endState the state to be transition to
936    * @return version of node after transition, -1 if unsuccessful transition
937    * @throws KeeperException if unexpected zookeeper exception
938    * @throws IOException
939    */
940   public static int transitionMergingNode(ZooKeeperWatcher zkw,
941       HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
942       final int znodeVersion, final EventType beginState,
943       final EventType endState) throws KeeperException, IOException {
944     byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
945     return ZKAssign.transitionNode(zkw, merged, serverName,
946       beginState, endState, znodeVersion, payload);
947   }
948 
949   /**
950    * Checks if the given region has merge qualifier in hbase:meta
951    * @param services
952    * @param regionName name of specified region
953    * @return true if the given region has merge qualifier in META.(It will be
954    *         cleaned by CatalogJanitor)
955    * @throws IOException
956    */
957   boolean hasMergeQualifierInMeta(final RegionServerServices services,
958       final byte[] regionName) throws IOException {
959     if (services == null) return false;
960     // Get merge regions if it is a merged region and already has merge
961     // qualifier
962     Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaReader
963         .getRegionsFromMergeQualifier(services.getCatalogTracker(), regionName);
964     if (mergeRegions != null &&
965         (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
966       // It has merge qualifier
967       return true;
968     }
969     return false;
970   }
971 }