View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.fail;
24  
25  import java.io.IOException;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.testclassification.LargeTests;
30  import org.apache.hadoop.hbase.MiniHBaseCluster;
31  import org.apache.hadoop.hbase.client.Get;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.client.Result;
35  import org.apache.hadoop.hbase.client.ResultScanner;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.JVMClusterUtil;
39  import org.junit.Before;
40  import org.junit.Test;
41  import org.junit.experimental.categories.Category;
42  
43  /**
44   * Test handling of changes to the number of a peer's regionservers.
45   */
46  @Category(LargeTests.class)
47  public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
48  
49    private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class);
50  
51    /**
52     * @throws java.lang.Exception
53     */
54    @Before
55    public void setUp() throws Exception {
56      htable1.setAutoFlush(false, true);
57      // Starting and stopping replication can make us miss new logs,
58      // rolling like this makes sure the most recent one gets added to the queue
59      for (JVMClusterUtil.RegionServerThread r :
60                            utility1.getHBaseCluster().getRegionServerThreads()) {
61        r.getRegionServer().getWAL().rollWriter();
62      }
63      utility1.truncateTable(tableName);
64      // truncating the table will send one Delete per row to the slave cluster
65      // in an async fashion, which is why we cannot just call truncateTable on
66      // utility2 since late writes could make it to the slave in some way.
67      // Instead, we truncate the first table and wait for all the Deletes to
68      // make it to the slave.
69      Scan scan = new Scan();
70      int lastCount = 0;
71      for (int i = 0; i < NB_RETRIES; i++) {
72        if (i == NB_RETRIES - 1) {
73          fail("Waited too much time for truncate");
74        }
75        ResultScanner scanner = htable2.getScanner(scan);
76        Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
77        scanner.close();
78        if (res.length != 0) {
79          if (res.length < lastCount) {
80            i--; // Don't increment timeout if we make progress
81          }
82          lastCount = res.length;
83          LOG.info("Still got " + res.length + " rows");
84          Thread.sleep(SLEEP_TIME);
85        } else {
86          break;
87        }
88      }
89    }
90  
91    @Test(timeout = 300000)
92    public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
93  
94      LOG.info("testSimplePutDelete");
95      MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
96  
97      doPutTest(Bytes.toBytes(1));
98  
99      int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
100     peerCluster.stopRegionServer(rsToStop);
101     peerCluster.waitOnRegionServer(rsToStop);
102 
103     // Sanity check
104     assertEquals(1, peerCluster.getRegionServerThreads().size());
105 
106     doPutTest(Bytes.toBytes(2));
107 
108     peerCluster.startRegionServer();
109 
110     // Sanity check
111     assertEquals(2, peerCluster.getRegionServerThreads().size());
112 
113     doPutTest(Bytes.toBytes(3));
114 
115   }
116 
117   private void doPutTest(byte[] row) throws IOException, InterruptedException {
118     Put put = new Put(row);
119     put.add(famName, row, row);
120 
121     htable1 = new HTable(conf1, tableName);
122     htable1.put(put);
123 
124     Get get = new Get(row);
125     for (int i = 0; i < NB_RETRIES; i++) {
126       if (i == NB_RETRIES - 1) {
127         fail("Waited too much time for put replication");
128       }
129       Result res = htable2.get(get);
130       if (res.size() == 0) {
131         LOG.info("Row not available");
132         Thread.sleep(SLEEP_TIME);
133       } else {
134         assertArrayEquals(res.value(), row);
135         break;
136       }
137     }
138 
139   }
140 
141 }