View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertArrayEquals;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.Random;
30  import java.util.Set;
31  
32  import org.apache.commons.io.IOUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Chore;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.testclassification.LargeTests;
42  import org.apache.hadoop.hbase.NotServingRegionException;
43  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
44  import org.apache.hadoop.hbase.ServerName;
45  import org.apache.hadoop.hbase.Stoppable;
46  import org.apache.hadoop.hbase.client.Get;
47  import org.apache.hadoop.hbase.client.HBaseAdmin;
48  import org.apache.hadoop.hbase.client.HConnection;
49  import org.apache.hadoop.hbase.client.HConnectionManager;
50  import org.apache.hadoop.hbase.client.HTable;
51  import org.apache.hadoop.hbase.client.MetaScanner;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.Scan;
55  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
56  import org.apache.hadoop.hbase.protobuf.RequestConverter;
57  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.ConfigUtil;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.apache.hadoop.hbase.util.PairOfSameType;
62  import org.apache.hadoop.hbase.util.StoppableImplementation;
63  import org.apache.hadoop.hbase.util.Threads;
64  import org.junit.AfterClass;
65  import org.junit.BeforeClass;
66  import org.junit.Test;
67  import org.junit.experimental.categories.Category;
68  
69  import com.google.common.collect.Iterators;
70  import com.google.common.collect.Sets;
71  import com.google.protobuf.ServiceException;
72  
73  @Category(LargeTests.class)
74  public class TestEndToEndSplitTransaction {
75    private static final Log LOG = LogFactory.getLog(TestEndToEndSplitTransaction.class);
76    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
77    private static final Configuration conf = TEST_UTIL.getConfiguration();
78  
79    @BeforeClass
80    public static void beforeAllTests() throws Exception {
81      TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
82      TEST_UTIL.startMiniCluster();
83    }
84  
85    @AfterClass
86    public static void afterAllTests() throws Exception {
87      TEST_UTIL.shutdownMiniCluster();
88    }
89  
90    @Test
91    public void testMasterOpsWhileSplitting() throws Exception {
92      TableName tableName =
93          TableName.valueOf("TestSplit");
94      byte[] familyName = Bytes.toBytes("fam");
95      HTable ht = TEST_UTIL.createTable(tableName, familyName);
96      TEST_UTIL.loadTable(ht, familyName, false);
97      ht.close();
98      HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
99      byte []firstRow = Bytes.toBytes("aaa");
100     byte []splitRow = Bytes.toBytes("lll");
101     byte []lastRow = Bytes.toBytes("zzz");
102     HConnection con = HConnectionManager
103         .getConnection(TEST_UTIL.getConfiguration());
104     // this will also cache the region
105     byte[] regionName = con.locateRegion(tableName, splitRow).getRegionInfo()
106         .getRegionName();
107     HRegion region = server.getRegion(regionName);
108     SplitTransaction split = new SplitTransaction(region, splitRow);
109     split.useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
110     split.prepare();
111 
112     // 1. phase I
113     PairOfSameType<HRegion> regions = split.createDaughters(server, server);
114     assertFalse(test(con, tableName, firstRow, server));
115     assertFalse(test(con, tableName, lastRow, server));
116 
117     // passing null as services prevents final step
118     // 2, most of phase II
119     split.openDaughters(server, null, regions.getFirst(), regions.getSecond());
120     assertFalse(test(con, tableName, firstRow, server));
121     assertFalse(test(con, tableName, lastRow, server));
122 
123     // 3. finish phase II
124     // note that this replicates some code from SplitTransaction
125     // 2nd daughter first
126     server.postOpenDeployTasks(regions.getSecond(), server.getCatalogTracker());
127     // Add to online regions
128     server.addToOnlineRegions(regions.getSecond());
129     // THIS is the crucial point:
130     // the 2nd daughter was added, so querying before the split key should fail.
131     assertFalse(test(con, tableName, firstRow, server));
132     // past splitkey is ok.
133     assertTrue(test(con, tableName, lastRow, server));
134 
135     // first daughter second
136     server.postOpenDeployTasks(regions.getFirst(), server.getCatalogTracker());
137     // Add to online regions
138     server.addToOnlineRegions(regions.getFirst());
139     assertTrue(test(con, tableName, firstRow, server));
140     assertTrue(test(con, tableName, lastRow, server));
141 
142     // 4. phase III
143     if (split.useZKForAssignment) {
144       split.transitionZKNode(server, server, regions.getFirst(), regions.getSecond());
145     }
146     assertTrue(test(con, tableName, firstRow, server));
147     assertTrue(test(con, tableName, lastRow, server));
148   }
149 
150   /**
151    * attempt to locate the region and perform a get and scan
152    * @return True if successful, False otherwise.
153    */
154   private boolean test(HConnection con, TableName tableName, byte[] row,
155       HRegionServer server) {
156     // not using HTable to avoid timeouts and retries
157     try {
158       byte[] regionName = con.relocateRegion(tableName, row).getRegionInfo()
159           .getRegionName();
160       // get and scan should now succeed without exception
161       ProtobufUtil.get(server, regionName, new Get(row));
162       ScanRequest scanRequest = RequestConverter.buildScanRequest(
163         regionName, new Scan(row), 1, true);
164       try {
165         server.scan(new PayloadCarryingRpcController(), scanRequest);
166       } catch (ServiceException se) {
167         throw ProtobufUtil.getRemoteException(se);
168       }
169     } catch (IOException x) {
170       return false;
171     }
172     return true;
173   }
174 
175   /**
176    * Tests that the client sees meta table changes as atomic during splits
177    */
178   @Test
179   public void testFromClientSideWhileSplitting() throws Throwable {
180     LOG.info("Starting testFromClientSideWhileSplitting");
181     final TableName TABLENAME =
182         TableName.valueOf("testFromClientSideWhileSplitting");
183     final byte[] FAMILY = Bytes.toBytes("family");
184 
185     //SplitTransaction will update the meta table by offlining the parent region, and adding info
186     //for daughters.
187     HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
188 
189     Stoppable stopper = new StoppableImplementation();
190     RegionSplitter regionSplitter = new RegionSplitter(table);
191     RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME);
192 
193     regionChecker.start();
194     regionSplitter.start();
195 
196     //wait until the splitter is finished
197     regionSplitter.join();
198     stopper.stop(null);
199 
200     if (regionChecker.ex != null) {
201       throw regionChecker.ex;
202     }
203 
204     if (regionSplitter.ex != null) {
205       throw regionSplitter.ex;
206     }
207 
208     //one final check
209     regionChecker.verify();
210   }
211 
212   static class RegionSplitter extends Thread {
213     Throwable ex;
214     HTable table;
215     TableName tableName;
216     byte[] family;
217     HBaseAdmin admin;
218     HRegionServer rs;
219 
220     RegionSplitter(HTable table) throws IOException {
221       this.table = table;
222       this.tableName = table.getName();
223       this.family = table.getTableDescriptor().getFamiliesKeys().iterator().next();
224       admin = TEST_UTIL.getHBaseAdmin();
225       rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
226     }
227 
228     public void run() {
229       try {
230         Random random = new Random();
231         for (int i=0; i< 5; i++) {
232           NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, null,
233               tableName, false);
234           if (regions.size() == 0) {
235             continue;
236           }
237           int regionIndex = random.nextInt(regions.size());
238 
239           //pick a random region and split it into two
240           HRegionInfo region = Iterators.get(regions.keySet().iterator(), regionIndex);
241 
242           //pick the mid split point
243           int start = 0, end = Integer.MAX_VALUE;
244           if (region.getStartKey().length > 0) {
245             start = Bytes.toInt(region.getStartKey());
246           }
247           if (region.getEndKey().length > 0) {
248             end = Bytes.toInt(region.getEndKey());
249           }
250           int mid = start + ((end - start) / 2);
251           byte[] splitPoint = Bytes.toBytes(mid);
252 
253           //put some rows to the regions
254           addData(start);
255           addData(mid);
256 
257           flushAndBlockUntilDone(admin, rs, region.getRegionName());
258           compactAndBlockUntilDone(admin, rs, region.getRegionName());
259 
260           log("Initiating region split for:" + region.getRegionNameAsString());
261           try {
262             admin.split(region.getRegionName(), splitPoint);
263             //wait until the split is complete
264             blockUntilRegionSplit(conf, 50000, region.getRegionName(), true);
265 
266           } catch (NotServingRegionException ex) {
267             //ignore
268           }
269         }
270       } catch (Throwable ex) {
271         this.ex = ex;
272       }
273     }
274 
275     void addData(int start) throws IOException {
276       for (int i=start; i< start + 100; i++) {
277         Put put = new Put(Bytes.toBytes(i));
278 
279         put.add(family, family, Bytes.toBytes(i));
280         table.put(put);
281       }
282       table.flushCommits();
283     }
284   }
285 
286   /**
287    * Checks regions using MetaScanner, MetaReader and HTable methods
288    */
289   static class RegionChecker extends Chore {
290     Configuration conf;
291     TableName tableName;
292     Throwable ex;
293 
294     RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) {
295       super("RegionChecker", 10, stopper);
296       this.conf = conf;
297       this.tableName = tableName;
298       this.setDaemon(true);
299     }
300 
301     /** verify region boundaries obtained from MetaScanner */
302     void verifyRegionsUsingMetaScanner() throws Exception {
303 
304       //MetaScanner.allTableRegions()
305       NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, null,
306           tableName, false);
307       verifyTableRegions(regions.keySet());
308 
309       //MetaScanner.listAllRegions()
310       List<HRegionInfo> regionList = MetaScanner.listAllRegions(conf, false);
311       verifyTableRegions(Sets.newTreeSet(regionList));
312     }
313 
314     /** verify region boundaries obtained from HTable.getStartEndKeys() */
315     void verifyRegionsUsingHTable() throws IOException {
316       HTable table = null;
317       try {
318         //HTable.getStartEndKeys()
319         table = new HTable(conf, tableName);
320         Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
321         verifyStartEndKeys(keys);
322 
323         //HTable.getRegionsInfo()
324         Map<HRegionInfo, ServerName> regions = table.getRegionLocations();
325         verifyTableRegions(regions.keySet());
326       } finally {
327         IOUtils.closeQuietly(table);
328       }
329     }
330 
331     void verify() throws Exception {
332       verifyRegionsUsingMetaScanner();
333       verifyRegionsUsingHTable();
334     }
335 
336     void verifyTableRegions(Set<HRegionInfo> regions) {
337       log("Verifying " + regions.size() + " regions");
338 
339       byte[][] startKeys = new byte[regions.size()][];
340       byte[][] endKeys = new byte[regions.size()][];
341 
342       int i=0;
343       for (HRegionInfo region : regions) {
344         startKeys[i] = region.getStartKey();
345         endKeys[i] = region.getEndKey();
346         i++;
347       }
348 
349       Pair<byte[][], byte[][]> keys = new Pair<byte[][], byte[][]>(startKeys, endKeys);
350       verifyStartEndKeys(keys);
351     }
352 
353     void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
354       byte[][] startKeys = keys.getFirst();
355       byte[][] endKeys = keys.getSecond();
356       assertEquals(startKeys.length, endKeys.length);
357       assertTrue("Found 0 regions for the table", startKeys.length > 0);
358 
359       assertArrayEquals("Start key for the first region is not byte[0]",
360           HConstants.EMPTY_START_ROW, startKeys[0]);
361       byte[] prevEndKey = HConstants.EMPTY_START_ROW;
362 
363       // ensure that we do not have any gaps
364       for (int i=0; i<startKeys.length; i++) {
365         assertArrayEquals(
366             "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey)
367                 + " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]), prevEndKey,
368             startKeys[i]);
369         prevEndKey = endKeys[i];
370       }
371       assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
372           endKeys[endKeys.length - 1]);
373     }
374 
375     @Override
376     protected void chore() {
377       try {
378         verify();
379       } catch (Throwable ex) {
380         this.ex = ex;
381         stopper.stop("caught exception");
382       }
383     }
384   }
385 
386   public static void log(String msg) {
387     LOG.info(msg);
388   }
389 
390   /* some utility methods for split tests */
391 
392   public static void flushAndBlockUntilDone(HBaseAdmin admin, HRegionServer rs, byte[] regionName)
393       throws IOException, InterruptedException {
394     log("flushing region: " + Bytes.toStringBinary(regionName));
395     admin.flush(regionName);
396     log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
397     Threads.sleepWithoutInterrupt(500);
398     while (rs.cacheFlusher.getFlushQueueSize() > 0) {
399       Threads.sleep(50);
400     }
401   }
402 
403   public static void compactAndBlockUntilDone(HBaseAdmin admin, HRegionServer rs, byte[] regionName)
404       throws IOException, InterruptedException {
405     log("Compacting region: " + Bytes.toStringBinary(regionName));
406     admin.majorCompact(regionName);
407     log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
408     Threads.sleepWithoutInterrupt(500);
409     while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
410       Threads.sleep(50);
411     }
412   }
413 
414   /** Blocks until the region split is complete in hbase:meta and region server opens the daughters */
415   public static void blockUntilRegionSplit(Configuration conf, long timeout,
416       final byte[] regionName, boolean waitForDaughters)
417       throws IOException, InterruptedException {
418     long start = System.currentTimeMillis();
419     log("blocking until region is split:" +  Bytes.toStringBinary(regionName));
420     HRegionInfo daughterA = null, daughterB = null;
421     HTable metaTable = new HTable(conf, TableName.META_TABLE_NAME);
422 
423     try {
424       while (System.currentTimeMillis() - start < timeout) {
425         Result result = getRegionRow(metaTable, regionName);
426         if (result == null) {
427           break;
428         }
429 
430         HRegionInfo region = HRegionInfo.getHRegionInfo(result);
431         if(region.isSplitParent()) {
432           log("found parent region: " + region.toString());
433           PairOfSameType<HRegionInfo> pair = HRegionInfo.getDaughterRegions(result);
434           daughterA = pair.getFirst();
435           daughterB = pair.getSecond();
436           break;
437         }
438         Threads.sleep(100);
439       }
440 
441       //if we are here, this means the region split is complete or timed out
442       if (waitForDaughters) {
443         long rem = timeout - (System.currentTimeMillis() - start);
444         blockUntilRegionIsInMeta(metaTable, rem, daughterA);
445 
446         rem = timeout - (System.currentTimeMillis() - start);
447         blockUntilRegionIsInMeta(metaTable, rem, daughterB);
448 
449         rem = timeout - (System.currentTimeMillis() - start);
450         blockUntilRegionIsOpened(conf, rem, daughterA);
451 
452         rem = timeout - (System.currentTimeMillis() - start);
453         blockUntilRegionIsOpened(conf, rem, daughterB);
454       }
455     } finally {
456       IOUtils.closeQuietly(metaTable);
457     }
458   }
459 
460   public static Result getRegionRow(HTable metaTable, byte[] regionName) throws IOException {
461     Get get = new Get(regionName);
462     return metaTable.get(get);
463   }
464 
465   public static void blockUntilRegionIsInMeta(HTable metaTable, long timeout, HRegionInfo hri)
466       throws IOException, InterruptedException {
467     log("blocking until region is in META: " + hri.getRegionNameAsString());
468     long start = System.currentTimeMillis();
469     while (System.currentTimeMillis() - start < timeout) {
470       Result result = getRegionRow(metaTable, hri.getRegionName());
471       if (result != null) {
472         HRegionInfo info = HRegionInfo.getHRegionInfo(result);
473         if (info != null && !info.isOffline()) {
474           log("found region in META: " + hri.getRegionNameAsString());
475           break;
476         }
477       }
478       Threads.sleep(10);
479     }
480   }
481 
482   public static void blockUntilRegionIsOpened(Configuration conf, long timeout, HRegionInfo hri)
483       throws IOException, InterruptedException {
484     log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
485     long start = System.currentTimeMillis();
486     HTable table = new HTable(conf, hri.getTable());
487 
488     try {
489       byte [] row = hri.getStartKey();
490       // Check for null/empty row.  If we find one, use a key that is likely to be in first region.
491       if (row == null || row.length <= 0) row = new byte [] {'0'};
492       Get get = new Get(row);
493       while (System.currentTimeMillis() - start < timeout) {
494         try {
495           table.get(get);
496           break;
497         } catch(IOException ex) {
498           //wait some more
499         }
500         Threads.sleep(10);
501       }
502     } finally {
503       IOUtils.closeQuietly(table);
504     }
505   }
506 }
507