View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
22  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
23  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
24  
25  import java.io.IOException;
26  import java.io.InterruptedIOException;
27  import java.util.ArrayList;
28  import java.util.List;
29  import java.util.ListIterator;
30  import java.util.Map;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.RegionTransition;
46  import org.apache.hadoop.hbase.Server;
47  import org.apache.hadoop.hbase.ServerName;
48  import org.apache.hadoop.hbase.catalog.CatalogTracker;
49  import org.apache.hadoop.hbase.catalog.MetaEditor;
50  import org.apache.hadoop.hbase.client.Mutation;
51  import org.apache.hadoop.hbase.client.Put;
52  import org.apache.hadoop.hbase.executor.EventType;
53  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.CancelableProgressable;
56  import org.apache.hadoop.hbase.util.ConfigUtil;
57  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58  import org.apache.hadoop.hbase.util.FSUtils;
59  import org.apache.hadoop.hbase.util.HasThread;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.apache.hadoop.hbase.util.PairOfSameType;
62  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
63  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
64  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
65  import org.apache.zookeeper.KeeperException;
66  import org.apache.zookeeper.KeeperException.NodeExistsException;
67  import org.apache.zookeeper.data.Stat;
68  
69  import com.google.common.util.concurrent.ThreadFactoryBuilder;
70  
71  /**
72   * Executes region split as a "transaction".  Call {@link #prepare()} to setup
73   * the transaction, {@link #execute(Server, RegionServerServices)} to run the
74   * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails.
75   *
76   * <p>Here is an example of how you would use this class:
77   * <pre>
78   *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
79   *  if (!st.prepare()) return;
80   *  try {
81   *    st.execute(server, services);
82   *  } catch (IOException ioe) {
83   *    try {
84   *      st.rollback(server, services);
85   *      return;
86   *    } catch (RuntimeException e) {
87   *      myAbortable.abort("Failed split, abort");
88   *    }
89   *  }
90   * </Pre>
91   * <p>This class is not thread safe.  Caller needs ensure split is run by
92   * one thread only.
93   */
94  @InterfaceAudience.Private
95  public class SplitTransaction {
96    private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
97  
98    /*
99     * Region to split
100    */
101   private final HRegion parent;
102   private HRegionInfo hri_a;
103   private HRegionInfo hri_b;
104   private long fileSplitTimeout = 30000;
105   private int znodeVersion = -1;
106   boolean useZKForAssignment;
107 
108   /*
109    * Row to split around
110    */
111   private final byte [] splitrow;
112 
113   /**
114    * Types to add to the transaction journal.
115    * Each enum is a step in the split transaction. Used to figure how much
116    * we need to rollback.
117    */
118   static enum JournalEntryType {
119     /**
120      * Started
121      */
122     STARTED,
123     /**
124      * Prepared (after table lock)
125      */
126     PREPARED,
127     /**
128      * Before preSplit coprocessor hook
129      */
130     BEFORE_PRE_SPLIT_HOOK,
131     /**
132      * After preSplit coprocessor hook
133      */
134     AFTER_PRE_SPLIT_HOOK,
135     /**
136      * Set region as in transition, set it into SPLITTING state.
137      */
138     SET_SPLITTING_IN_ZK,
139     /**
140      * We created the temporary split data directory.
141      */
142     CREATE_SPLIT_DIR,
143     /**
144      * Closed the parent region.
145      */
146     CLOSED_PARENT_REGION,
147     /**
148      * The parent has been taken out of the server's online regions list.
149      */
150     OFFLINED_PARENT,
151     /**
152      * Started in on creation of the first daughter region.
153      */
154     STARTED_REGION_A_CREATION,
155     /**
156      * Started in on the creation of the second daughter region.
157      */
158     STARTED_REGION_B_CREATION,
159     /**
160      * Opened the first daughter region
161      */
162     OPENED_REGION_A,
163     /**
164      * Opened the second daughter region
165      */
166     OPENED_REGION_B,
167     /**
168      * Before postSplit coprocessor hook
169      */
170     BEFORE_POST_SPLIT_HOOK,
171     /**
172      * After postSplit coprocessor hook
173      */
174     AFTER_POST_SPLIT_HOOK,
175     /**
176      * Point of no return.
177      * If we got here, then transaction is not recoverable other than by
178      * crashing out the regionserver.
179      */
180     PONR
181   }
182 
183   static class JournalEntry {
184     private JournalEntryType type;
185     private long timestamp;
186 
187     public JournalEntry(JournalEntryType type) {
188       this(type, EnvironmentEdgeManager.currentTimeMillis());
189     }
190 
191     public JournalEntry(JournalEntryType type, long timestamp) {
192       this.type = type;
193       this.timestamp = timestamp;
194     }
195 
196     @Override
197     public String toString() {
198       StringBuilder sb = new StringBuilder();
199       sb.append(type);
200       sb.append(" at ");
201       sb.append(timestamp);
202       return sb.toString();
203     }
204   }
205 
206   /*
207    * Journal of how far the split transaction has progressed.
208    */
209   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
210 
211   /**
212    * Constructor
213    * @param r Region to split
214    * @param splitrow Row to split around
215    */
216   public SplitTransaction(final HRegion r, final byte [] splitrow) {
217     this.parent = r;
218     this.splitrow = splitrow;
219     this.journal.add(new JournalEntry(JournalEntryType.STARTED));
220     this.useZKForAssignment = ConfigUtil.useZKForAssignment(r.getBaseConf());
221   }
222 
223   /**
224    * Does checks on split inputs.
225    * @return <code>true</code> if the region is splittable else
226    * <code>false</code> if it is not (e.g. its already closed, etc.).
227    */
228   public boolean prepare() {
229     if (!this.parent.isSplittable()) return false;
230     // Split key can be null if this region is unsplittable; i.e. has refs.
231     if (this.splitrow == null) return false;
232     HRegionInfo hri = this.parent.getRegionInfo();
233     parent.prepareToSplit();
234     // Check splitrow.
235     byte [] startKey = hri.getStartKey();
236     byte [] endKey = hri.getEndKey();
237     if (Bytes.equals(startKey, splitrow) ||
238         !this.parent.getRegionInfo().containsRow(splitrow)) {
239       LOG.info("Split row is not inside region key range or is equal to " +
240           "startkey: " + Bytes.toStringBinary(this.splitrow));
241       return false;
242     }
243     long rid = getDaughterRegionIdTimestamp(hri);
244     this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
245     this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
246     this.journal.add(new JournalEntry(JournalEntryType.PREPARED));
247     return true;
248   }
249 
250   /**
251    * Calculate daughter regionid to use.
252    * @param hri Parent {@link HRegionInfo}
253    * @return Daughter region id (timestamp) to use.
254    */
255   private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
256     long rid = EnvironmentEdgeManager.currentTimeMillis();
257     // Regionid is timestamp.  Can't be less than that of parent else will insert
258     // at wrong location in hbase:meta (See HBASE-710).
259     if (rid < hri.getRegionId()) {
260       LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
261         " but current time here is " + rid);
262       rid = hri.getRegionId() + 1;
263     }
264     return rid;
265   }
266 
267   private static IOException closedByOtherException = new IOException(
268       "Failed to close region: already closed by another thread");
269 
270   /**
271    * Prepare the regions and region files.
272    * @param server Hosting server instance.  Can be null when testing (won't try
273    * and update in zk if a null server)
274    * @param services Used to online/offline regions.
275    * @throws IOException If thrown, transaction failed.
276    *    Call {@link #rollback(Server, RegionServerServices)}
277    * @return Regions created
278    */
279   /* package */PairOfSameType<HRegion> createDaughters(final Server server,
280       final RegionServerServices services) throws IOException {
281     LOG.info("Starting split of region " + this.parent);
282     if ((server != null && server.isStopped()) ||
283         (services != null && services.isStopping())) {
284       throw new IOException("Server is stopped or stopping");
285     }
286     assert !this.parent.lock.writeLock().isHeldByCurrentThread():
287       "Unsafe to hold write lock while performing RPCs";
288 
289     journal.add(new JournalEntry(JournalEntryType.BEFORE_PRE_SPLIT_HOOK));
290 
291     // Coprocessor callback
292     if (this.parent.getCoprocessorHost() != null) {
293       // TODO: Remove one of these
294       this.parent.getCoprocessorHost().preSplit();
295       this.parent.getCoprocessorHost().preSplit(this.splitrow);
296     }
297 
298     journal.add(new JournalEntry(JournalEntryType.AFTER_PRE_SPLIT_HOOK));
299 
300     // If true, no cluster to write meta edits to or to update znodes in.
301     boolean testing = server == null? true:
302         server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
303     this.fileSplitTimeout = testing ? this.fileSplitTimeout :
304         server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
305           this.fileSplitTimeout);
306 
307     PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
308 
309     List<Mutation> metaEntries = new ArrayList<Mutation>();
310     if (this.parent.getCoprocessorHost() != null) {
311       if (this.parent.getCoprocessorHost().
312           preSplitBeforePONR(this.splitrow, metaEntries)) {
313         throw new IOException("Coprocessor bypassing region "
314             + this.parent.getRegionNameAsString() + " split.");
315       }
316       try {
317         for (Mutation p : metaEntries) {
318           HRegionInfo.parseRegionName(p.getRow());
319         }
320       } catch (IOException e) {
321         LOG.error("Row key of mutation from coprossor is not parsable as region name."
322             + "Mutations from coprocessor should only for hbase:meta table.");
323         throw e;
324       }
325     }
326 
327     // This is the point of no return.  Adding subsequent edits to .META. as we
328     // do below when we do the daughter opens adding each to .META. can fail in
329     // various interesting ways the most interesting of which is a timeout
330     // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
331     // then subsequent failures need to crash out this regionserver; the
332     // server shutdown processing should be able to fix-up the incomplete split.
333     // The offlined parent will have the daughters as extra columns.  If
334     // we leave the daughter regions in place and do not remove them when we
335     // crash out, then they will have their references to the parent in place
336     // still and the server shutdown fixup of .META. will point to these
337     // regions.
338     // We should add PONR JournalEntry before offlineParentInMeta,so even if
339     // OfflineParentInMeta timeout,this will cause regionserver exit,and then
340     // master ServerShutdownHandler will fix daughter & avoid data loss. (See
341     // HBase-4562).
342     this.journal.add(new JournalEntry(JournalEntryType.PONR));
343 
344     // Edit parent in meta.  Offlines parent region and adds splita and splitb
345     // as an atomic update. See HBASE-7721. This update to META makes the region
346     // will determine whether the region is split or not in case of failures.
347     // If it is successful, master will roll-forward, if not, master will rollback
348     // and assign the parent region.
349     if (!testing && useZKForAssignment) {
350       if (metaEntries == null || metaEntries.isEmpty()) {
351         MetaEditor.splitRegion(server.getCatalogTracker(), parent.getRegionInfo(), daughterRegions
352             .getFirst().getRegionInfo(), daughterRegions.getSecond().getRegionInfo(), server
353             .getServerName());
354       } else {
355         offlineParentInMetaAndputMetaEntries(server.getCatalogTracker(), parent.getRegionInfo(),
356           daughterRegions.getFirst().getRegionInfo(), daughterRegions.getSecond().getRegionInfo(),
357           server.getServerName(), metaEntries);
358       }
359     } else if (services != null && !useZKForAssignment) {
360       if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR, parent.getRegionInfo(),
361         hri_a, hri_b)) {
362         // Passed PONR, let SSH clean it up
363         throw new IOException("Failed to notify master that split passed PONR: "
364             + parent.getRegionInfo().getRegionNameAsString());
365       }
366     }
367     return daughterRegions;
368   }
369 
370   public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
371       final RegionServerServices services, boolean testing) throws IOException {
372     // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
373     // have zookeeper so don't do zk stuff if server or zookeeper is null
374     if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
375       try {
376         createNodeSplitting(server.getZooKeeper(),
377           parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
378       } catch (KeeperException e) {
379         throw new IOException("Failed creating PENDING_SPLIT znode on " +
380           this.parent.getRegionNameAsString(), e);
381       }
382     } else if (services != null && !useZKForAssignment) {
383       if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
384         parent.getRegionInfo(), hri_a, hri_b)) {
385         throw new IOException("Failed to get ok from master to split "
386             + parent.getRegionNameAsString());
387       }
388     }
389     this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING_IN_ZK));
390     if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
391       // After creating the split node, wait for master to transition it
392       // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
393       // knows about it and won't transition any region which is splitting.
394       znodeVersion = getZKNode(server, services);
395     }
396 
397     this.parent.getRegionFileSystem().createSplitsDir();
398     this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR));
399 
400     Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
401     Exception exceptionToThrow = null;
402     try{
403       hstoreFilesToSplit = this.parent.close(false);
404     } catch (Exception e) {
405       exceptionToThrow = e;
406     }
407     if (exceptionToThrow == null && hstoreFilesToSplit == null) {
408       // The region was closed by a concurrent thread.  We can't continue
409       // with the split, instead we must just abandon the split.  If we
410       // reopen or split this could cause problems because the region has
411       // probably already been moved to a different server, or is in the
412       // process of moving to a different server.
413       exceptionToThrow = closedByOtherException;
414     }
415     if (exceptionToThrow != closedByOtherException) {
416       this.journal.add(new JournalEntry(JournalEntryType.CLOSED_PARENT_REGION));
417     }
418     if (exceptionToThrow != null) {
419       if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
420       throw new IOException(exceptionToThrow);
421     }
422     if (!testing) {
423       services.removeFromOnlineRegions(this.parent, null);
424     }
425     this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT));
426 
427     // TODO: If splitStoreFiles were multithreaded would we complete steps in
428     // less elapsed time?  St.Ack 20100920
429     //
430     // splitStoreFiles creates daughter region dirs under the parent splits dir
431     // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
432     // clean this up.
433     Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
434 
435     // Log to the journal that we are creating region A, the first daughter
436     // region.  We could fail halfway through.  If we do, we could have left
437     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
438     // add entry to journal BEFORE rather than AFTER the change.
439     this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));
440     assertReferenceFileCount(expectedReferences.getFirst(),
441         this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
442     HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
443     assertReferenceFileCount(expectedReferences.getFirst(),
444         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
445 
446     // Ditto
447     this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));
448     assertReferenceFileCount(expectedReferences.getSecond(),
449         this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
450     HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
451     assertReferenceFileCount(expectedReferences.getSecond(),
452         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
453 
454     return new PairOfSameType<HRegion>(a, b);
455   }
456 
457   void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
458       throws IOException {
459     if (expectedReferenceFileCount != 0 &&
460         expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(this.parent.getFilesystem(), dir)) {
461       throw new IOException("Failing split. Expected reference file count isn't equal.");
462     }
463   }
464 
465   /**
466    * Perform time consuming opening of the daughter regions.
467    * @param server Hosting server instance.  Can be null when testing (won't try
468    * and update in zk if a null server)
469    * @param services Used to online/offline regions.
470    * @param a first daughter region
471    * @param a second daughter region
472    * @throws IOException If thrown, transaction failed.
473    *          Call {@link #rollback(Server, RegionServerServices)}
474    */
475   /* package */void openDaughters(final Server server,
476       final RegionServerServices services, HRegion a, HRegion b)
477       throws IOException {
478     boolean stopped = server != null && server.isStopped();
479     boolean stopping = services != null && services.isStopping();
480     // TODO: Is this check needed here?
481     if (stopped || stopping) {
482       LOG.info("Not opening daughters " +
483           b.getRegionInfo().getRegionNameAsString() +
484           " and " +
485           a.getRegionInfo().getRegionNameAsString() +
486           " because stopping=" + stopping + ", stopped=" + stopped);
487     } else {
488       // Open daughters in parallel.
489       DaughterOpener aOpener = new DaughterOpener(server, a);
490       DaughterOpener bOpener = new DaughterOpener(server, b);
491       aOpener.start();
492       bOpener.start();
493       try {
494         aOpener.join();
495         if (aOpener.getException() == null) {
496           journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_A));
497         }
498         bOpener.join();
499         if (bOpener.getException() == null) {
500           journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_B));
501         }
502       } catch (InterruptedException e) {
503         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
504       }
505       if (aOpener.getException() != null) {
506         throw new IOException("Failed " +
507           aOpener.getName(), aOpener.getException());
508       }
509       if (bOpener.getException() != null) {
510         throw new IOException("Failed " +
511           bOpener.getName(), bOpener.getException());
512       }
513       if (services != null) {
514         try {
515           if (useZKForAssignment) {
516             // add 2nd daughter first (see HBASE-4335)
517             services.postOpenDeployTasks(b, server.getCatalogTracker());
518           } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
519               parent.getRegionInfo(), hri_a, hri_b)) {
520             throw new IOException("Failed to report split region to master: "
521               + parent.getRegionInfo().getShortNameToLog());
522           }
523           // Should add it to OnlineRegions
524           services.addToOnlineRegions(b);
525           if (useZKForAssignment) {
526             services.postOpenDeployTasks(a, server.getCatalogTracker());
527           }
528           services.addToOnlineRegions(a);
529         } catch (KeeperException ke) {
530           throw new IOException(ke);
531         }
532       }
533     }
534   }
535 
536   /**
537    * Finish off split transaction, transition the zknode
538    * @param server Hosting server instance.  Can be null when testing (won't try
539    * and update in zk if a null server)
540    * @param services Used to online/offline regions.
541    * @param a first daughter region
542    * @param a second daughter region
543    * @throws IOException If thrown, transaction failed.
544    *          Call {@link #rollback(Server, RegionServerServices)}
545    */
546   /* package */void transitionZKNode(final Server server,
547       final RegionServerServices services, HRegion a, HRegion b)
548       throws IOException {
549     // Tell master about split by updating zk.  If we fail, abort.
550     if (server != null && server.getZooKeeper() != null) {
551       try {
552         this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
553           parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
554           server.getServerName(), this.znodeVersion,
555           RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT);
556 
557         int spins = 0;
558         // Now wait for the master to process the split. We know it's done
559         // when the znode is deleted. The reason we keep tickling the znode is
560         // that it's possible for the master to miss an event.
561         do {
562           if (spins % 10 == 0) {
563             LOG.debug("Still waiting on the master to process the split for " +
564                 this.parent.getRegionInfo().getEncodedName());
565           }
566           Thread.sleep(100);
567           // When this returns -1 it means the znode doesn't exist
568           this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
569             parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
570             server.getServerName(), this.znodeVersion,
571             RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT);
572           spins++;
573         } while (this.znodeVersion != -1 && !server.isStopped()
574             && !services.isStopping());
575       } catch (Exception e) {
576         if (e instanceof InterruptedException) {
577           Thread.currentThread().interrupt();
578         }
579         throw new IOException("Failed telling master about split", e);
580       }
581     }
582 
583     
584 
585     // Leaving here, the splitdir with its dross will be in place but since the
586     // split was successful, just leave it; it'll be cleaned when parent is
587     // deleted and cleaned up.
588   }
589 
590   /**
591    * Wait for the splitting node to be transitioned from pending_split
592    * to splitting by master. That's how we are sure master has processed
593    * the event and is good with us to move on. If we don't get any update,
594    * we periodically transition the node so that master gets the callback.
595    * If the node is removed or is not in pending_split state any more,
596    * we abort the split.
597    */
598   private int getZKNode(final Server server,
599       final RegionServerServices services) throws IOException {
600     // Wait for the master to process the pending_split.
601     try {
602       int spins = 0;
603       Stat stat = new Stat();
604       ZooKeeperWatcher zkw = server.getZooKeeper();
605       ServerName expectedServer = server.getServerName();
606       String node = parent.getRegionInfo().getEncodedName();
607       while (!(server.isStopped() || services.isStopping())) {
608         if (spins % 5 == 0) {
609           LOG.debug("Still waiting for master to process "
610             + "the pending_split for " + node);
611           transitionSplittingNode(zkw, parent.getRegionInfo(),
612             hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT,
613             RS_ZK_REQUEST_REGION_SPLIT);
614         }
615         Thread.sleep(100);
616         spins++;
617         byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
618         if (data == null) {
619           throw new IOException("Data is null, splitting node "
620             + node + " no longer exists");
621         }
622         RegionTransition rt = RegionTransition.parseFrom(data);
623         EventType et = rt.getEventType();
624         if (et == RS_ZK_REGION_SPLITTING) {
625           ServerName serverName = rt.getServerName();
626           if (!serverName.equals(expectedServer)) {
627             throw new IOException("Splitting node " + node + " is for "
628               + serverName + ", not us " + expectedServer);
629           }
630           byte [] payloadOfSplitting = rt.getPayload();
631           List<HRegionInfo> splittingRegions = HRegionInfo.parseDelimitedFrom(
632             payloadOfSplitting, 0, payloadOfSplitting.length);
633           assert splittingRegions.size() == 2;
634           HRegionInfo a = splittingRegions.get(0);
635           HRegionInfo b = splittingRegions.get(1);
636           if (!(hri_a.equals(a) && hri_b.equals(b))) {
637             throw new IOException("Splitting node " + node + " is for " + a + ", "
638               + b + ", not expected daughters: " + hri_a + ", " + hri_b);
639           }
640           // Master has processed it.
641           return stat.getVersion();
642         }
643         if (et != RS_ZK_REQUEST_REGION_SPLIT) {
644           throw new IOException("Splitting node " + node
645             + " moved out of splitting to " + et);
646         }
647       }
648       // Server is stopping/stopped
649       throw new IOException("Server is "
650         + (services.isStopping() ? "stopping" : "stopped"));
651     } catch (Exception e) {
652       if (e instanceof InterruptedException) {
653         Thread.currentThread().interrupt();
654       }
655       throw new IOException("Failed getting SPLITTING znode on "
656         + parent.getRegionNameAsString(), e);
657     }
658   }
659 
660   /**
661    * Run the transaction.
662    * @param server Hosting server instance.  Can be null when testing (won't try
663    * and update in zk if a null server)
664    * @param services Used to online/offline regions.
665    * @throws IOException If thrown, transaction failed.
666    *          Call {@link #rollback(Server, RegionServerServices)}
667    * @return Regions created
668    * @throws IOException
669    * @see #rollback(Server, RegionServerServices)
670    */
671   public PairOfSameType<HRegion> execute(final Server server,
672       final RegionServerServices services)
673   throws IOException {
674     useZKForAssignment =
675         server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
676     PairOfSameType<HRegion> regions = createDaughters(server, services);
677     if (this.parent.getCoprocessorHost() != null) {
678       this.parent.getCoprocessorHost().preSplitAfterPONR();
679     }
680     return stepsAfterPONR(server, services, regions);
681   }
682 
683   public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
684       final RegionServerServices services, PairOfSameType<HRegion> regions)
685       throws IOException {
686     openDaughters(server, services, regions.getFirst(), regions.getSecond());
687     if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
688       transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
689     }
690     journal.add(new JournalEntry(JournalEntryType.BEFORE_POST_SPLIT_HOOK));
691     // Coprocessor callback
692     if (this.parent.getCoprocessorHost() != null) {
693       this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
694     }
695     journal.add(new JournalEntry(JournalEntryType.AFTER_POST_SPLIT_HOOK));
696     return regions;
697   }
698 
699   private void offlineParentInMetaAndputMetaEntries(CatalogTracker catalogTracker,
700       HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
701       ServerName serverName, List<Mutation> metaEntries) throws IOException {
702     List<Mutation> mutations = metaEntries;
703     HRegionInfo copyOfParent = new HRegionInfo(parent);
704     copyOfParent.setOffline(true);
705     copyOfParent.setSplit(true);
706 
707     //Put for parent
708     Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent);
709     MetaEditor.addDaughtersToPut(putParent, splitA, splitB);
710     mutations.add(putParent);
711     
712     //Puts for daughters
713     Put putA = MetaEditor.makePutFromRegionInfo(splitA);
714     Put putB = MetaEditor.makePutFromRegionInfo(splitB);
715 
716     addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
717     addLocation(putB, serverName, 1);
718     mutations.add(putA);
719     mutations.add(putB);
720     MetaEditor.mutateMetaTable(catalogTracker, mutations);
721   }
722 
723   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
724     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
725       Bytes.toBytes(sn.getHostAndPort()));
726     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
727       Bytes.toBytes(sn.getStartcode()));
728     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
729         Bytes.toBytes(openSeqNum));
730     return p;
731   }
732 
733   /*
734    * Open daughter region in its own thread.
735    * If we fail, abort this hosting server.
736    */
737   class DaughterOpener extends HasThread {
738     private final Server server;
739     private final HRegion r;
740     private Throwable t = null;
741 
742     DaughterOpener(final Server s, final HRegion r) {
743       super((s == null? "null-services": s.getServerName()) +
744         "-daughterOpener=" + r.getRegionInfo().getEncodedName());
745       setDaemon(true);
746       this.server = s;
747       this.r = r;
748     }
749 
750     /**
751      * @return Null if open succeeded else exception that causes us fail open.
752      * Call it after this thread exits else you may get wrong view on result.
753      */
754     Throwable getException() {
755       return this.t;
756     }
757 
758     @Override
759     public void run() {
760       try {
761         openDaughterRegion(this.server, r);
762       } catch (Throwable t) {
763         this.t = t;
764       }
765     }
766   }
767 
768   /**
769    * Open daughter regions, add them to online list and update meta.
770    * @param server
771    * @param daughter
772    * @throws IOException
773    * @throws KeeperException
774    */
775   void openDaughterRegion(final Server server, final HRegion daughter)
776   throws IOException, KeeperException {
777     HRegionInfo hri = daughter.getRegionInfo();
778     LoggingProgressable reporter = server == null ? null
779         : new LoggingProgressable(hri, server.getConfiguration().getLong(
780             "hbase.regionserver.split.daughter.open.log.interval", 10000));
781     daughter.openHRegion(reporter);
782   }
783 
784   static class LoggingProgressable implements CancelableProgressable {
785     private final HRegionInfo hri;
786     private long lastLog = -1;
787     private final long interval;
788 
789     LoggingProgressable(final HRegionInfo hri, final long interval) {
790       this.hri = hri;
791       this.interval = interval;
792     }
793 
794     @Override
795     public boolean progress() {
796       long now = EnvironmentEdgeManager.currentTimeMillis();
797       if (now - lastLog > this.interval) {
798         LOG.info("Opening " + this.hri.getRegionNameAsString());
799         this.lastLog = now;
800       }
801       return true;
802     }
803   }
804 
805 
806   /**
807    * Creates reference files for top and bottom half of the
808    * @param hstoreFilesToSplit map of store files to create half file references for.
809    * @return the number of reference files that were created.
810    * @throws IOException
811    */
812   private Pair<Integer, Integer> splitStoreFiles(
813       final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
814       throws IOException {
815     if (hstoreFilesToSplit == null) {
816       // Could be null because close didn't succeed -- for now consider it fatal
817       throw new IOException("Close returned empty list of StoreFiles");
818     }
819     // The following code sets up a thread pool executor with as many slots as
820     // there's files to split. It then fires up everything, waits for
821     // completion and finally checks for any exception
822     int nbFiles = hstoreFilesToSplit.size();
823     if (nbFiles == 0) {
824       // no file needs to be splitted.
825       return new Pair<Integer, Integer>(0,0);
826     }
827     LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent);
828     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
829     builder.setNameFormat("StoreFileSplitter-%1$d");
830     ThreadFactory factory = builder.build();
831     ThreadPoolExecutor threadPool =
832       (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
833     List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
834 
835     // Split each store file.
836     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
837       for (StoreFile sf: entry.getValue()) {
838         StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
839         futures.add(threadPool.submit(sfs));
840       }
841     }
842     // Shutdown the pool
843     threadPool.shutdown();
844 
845     // Wait for all the tasks to finish
846     try {
847       boolean stillRunning = !threadPool.awaitTermination(
848           this.fileSplitTimeout, TimeUnit.MILLISECONDS);
849       if (stillRunning) {
850         threadPool.shutdownNow();
851         // wait for the thread to shutdown completely.
852         while (!threadPool.isTerminated()) {
853           Thread.sleep(50);
854         }
855         throw new IOException("Took too long to split the" +
856             " files and create the references, aborting split");
857       }
858     } catch (InterruptedException e) {
859       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
860     }
861 
862     int created_a = 0;
863     int created_b = 0;
864     // Look for any exception
865     for (Future<Pair<Path, Path>> future : futures) {
866       try {
867         Pair<Path, Path> p = future.get();
868         created_a += p.getFirst() != null ? 1 : 0;
869         created_b += p.getSecond() != null ? 1 : 0;
870       } catch (InterruptedException e) {
871         throw (InterruptedIOException) new InterruptedIOException().initCause(e);
872       } catch (ExecutionException e) {
873         throw new IOException(e);
874       }
875     }
876 
877     if (LOG.isDebugEnabled()) {
878       LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
879           + " storefiles, Daugther B: " + created_b + " storefiles.");
880     }
881     return new Pair<Integer, Integer>(created_a, created_b);
882   }
883 
884   private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
885     HRegionFileSystem fs = this.parent.getRegionFileSystem();
886     String familyName = Bytes.toString(family);
887     Path path_a =
888         fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
889           this.parent.getSplitPolicy());
890     Path path_b =
891         fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
892           this.parent.getSplitPolicy());
893     return new Pair<Path,Path>(path_a, path_b);
894   }
895 
896   /**
897    * Utility class used to do the file splitting / reference writing
898    * in parallel instead of sequentially.
899    */
900   class StoreFileSplitter implements Callable<Pair<Path,Path>> {
901     private final byte[] family;
902     private final StoreFile sf;
903 
904     /**
905      * Constructor that takes what it needs to split
906      * @param family Family that contains the store file
907      * @param sf which file
908      */
909     public StoreFileSplitter(final byte[] family, final StoreFile sf) {
910       this.sf = sf;
911       this.family = family;
912     }
913 
914     public Pair<Path,Path> call() throws IOException {
915       return splitStoreFile(family, sf);
916     }
917   }
918 
919   /**
920    * @param server Hosting server instance (May be null when testing).
921    * @param services
922    * @throws IOException If thrown, rollback failed.  Take drastic action.
923    * @return True if we successfully rolled back, false if we got to the point
924    * of no return and so now need to abort the server to minimize damage.
925    */
926   @SuppressWarnings("deprecation")
927   public boolean rollback(final Server server, final RegionServerServices services)
928   throws IOException {
929     // Coprocessor callback
930     if (this.parent.getCoprocessorHost() != null) {
931       this.parent.getCoprocessorHost().preRollBackSplit();
932     }
933 
934     boolean result = true;
935     ListIterator<JournalEntry> iterator =
936       this.journal.listIterator(this.journal.size());
937     // Iterate in reverse.
938     while (iterator.hasPrevious()) {
939       JournalEntry je = iterator.previous();
940       switch(je.type) {
941 
942       case SET_SPLITTING_IN_ZK:
943         if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
944           cleanZK(server, this.parent.getRegionInfo());
945         } else if (services != null
946             && !useZKForAssignment
947             && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
948               parent.getRegionInfo(), hri_a, hri_b)) {
949           return false;
950         }
951         break;
952 
953       case CREATE_SPLIT_DIR:
954         this.parent.writestate.writesEnabled = true;
955         this.parent.getRegionFileSystem().cleanupSplitsDir();
956         break;
957 
958       case CLOSED_PARENT_REGION:
959         try {
960           // So, this returns a seqid but if we just closed and then reopened, we
961           // should be ok. On close, we flushed using sequenceid obtained from
962           // hosting regionserver so no need to propagate the sequenceid returned
963           // out of initialize below up into regionserver as we normally do.
964           // TODO: Verify.
965           this.parent.initialize();
966         } catch (IOException e) {
967           LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
968             this.parent.getRegionNameAsString(), e);
969           throw new RuntimeException(e);
970         }
971         break;
972 
973       case STARTED_REGION_A_CREATION:
974         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
975         break;
976 
977       case STARTED_REGION_B_CREATION:
978         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
979         break;
980 
981       case OFFLINED_PARENT:
982         if (services != null) services.addToOnlineRegions(this.parent);
983         break;
984 
985       case PONR:
986         // We got to the point-of-no-return so we need to just abort. Return
987         // immediately.  Do not clean up created daughter regions.  They need
988         // to be in place so we don't delete the parent region mistakenly.
989         // See HBASE-3872.
990         return false;
991 
992       // Informational only cases
993       case STARTED:
994       case PREPARED:
995       case BEFORE_PRE_SPLIT_HOOK:
996       case AFTER_PRE_SPLIT_HOOK:
997       case BEFORE_POST_SPLIT_HOOK:
998       case AFTER_POST_SPLIT_HOOK:
999       case OPENED_REGION_A:
1000       case OPENED_REGION_B:
1001         break;
1002 
1003       default:
1004         throw new RuntimeException("Unhandled journal entry: " + je);
1005       }
1006     }
1007     // Coprocessor callback
1008     if (this.parent.getCoprocessorHost() != null) {
1009       this.parent.getCoprocessorHost().postRollBackSplit();
1010     }
1011     return result;
1012   }
1013 
1014   HRegionInfo getFirstDaughter() {
1015     return hri_a;
1016   }
1017 
1018   HRegionInfo getSecondDaughter() {
1019     return hri_b;
1020   }
1021 
1022   private static void cleanZK(final Server server, final HRegionInfo hri) {
1023     try {
1024       // Only delete if its in expected state; could have been hijacked.
1025       if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
1026           RS_ZK_REQUEST_REGION_SPLIT, server.getServerName())) {
1027         ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
1028           RS_ZK_REGION_SPLITTING, server.getServerName());
1029       }
1030     } catch (KeeperException.NoNodeException e) {
1031       LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
1032     } catch (KeeperException e) {
1033       server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
1034     }
1035   }
1036 
1037   /**
1038    * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
1039    * Create it ephemeral in case regionserver dies mid-split.
1040    *
1041    * <p>Does not transition nodes from other states.  If a node already exists
1042    * for this region, a {@link NodeExistsException} will be thrown.
1043    *
1044    * @param zkw zk reference
1045    * @param region region to be created as offline
1046    * @param serverName server event originates from
1047    * @throws KeeperException
1048    * @throws IOException
1049    */
1050   public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
1051       final ServerName serverName, final HRegionInfo a,
1052       final HRegionInfo b) throws KeeperException, IOException {
1053     LOG.debug(zkw.prefix("Creating ephemeral node for " +
1054       region.getEncodedName() + " in PENDING_SPLIT state"));
1055     byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
1056     RegionTransition rt = RegionTransition.createRegionTransition(
1057       RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
1058     String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
1059     if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
1060       throw new IOException("Failed create of ephemeral " + node);
1061     }
1062   }
1063 
1064   /**
1065    * Transitions an existing ephemeral node for the specified region which is
1066    * currently in the begin state to be in the end state. Master cleans up the
1067    * final SPLIT znode when it reads it (or if we crash, zk will clean it up).
1068    *
1069    * <p>Does not transition nodes from other states. If for some reason the
1070    * node could not be transitioned, the method returns -1. If the transition
1071    * is successful, the version of the node after transition is returned.
1072    *
1073    * <p>This method can fail and return false for three different reasons:
1074    * <ul><li>Node for this region does not exist</li>
1075    * <li>Node for this region is not in the begin state</li>
1076    * <li>After verifying the begin state, update fails because of wrong version
1077    * (this should never actually happen since an RS only does this transition
1078    * following a transition to the begin state. If two RS are conflicting, one would
1079    * fail the original transition to the begin state and not this transition)</li>
1080    * </ul>
1081    *
1082    * <p>Does not set any watches.
1083    *
1084    * <p>This method should only be used by a RegionServer when splitting a region.
1085    *
1086    * @param zkw zk reference
1087    * @param parent region to be transitioned to opened
1088    * @param a Daughter a of split
1089    * @param b Daughter b of split
1090    * @param serverName server event originates from
1091    * @param znodeVersion expected version of data before modification
1092    * @param beginState the expected current state the znode should be
1093    * @param endState the state to be transition to
1094    * @return version of node after transition, -1 if unsuccessful transition
1095    * @throws KeeperException if unexpected zookeeper exception
1096    * @throws IOException
1097    */
1098   public static int transitionSplittingNode(ZooKeeperWatcher zkw,
1099       HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
1100       final int znodeVersion, final EventType beginState,
1101       final EventType endState) throws KeeperException, IOException {
1102     byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
1103     return ZKAssign.transitionNode(zkw, parent, serverName,
1104       beginState, endState, znodeVersion, payload);
1105   }
1106 
1107   List<JournalEntry> getJournal() {
1108     return journal;
1109   }
1110 }