View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.List;
25  import java.util.Random;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.MiniHBaseCluster;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.HBaseAdmin;
36  import org.apache.hadoop.hbase.client.HConnection;
37  import org.apache.hadoop.hbase.client.HConnectionManager;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.client.HTableInterface;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
42  import org.apache.hadoop.hbase.regionserver.HRegion;
43  import org.apache.hadoop.hbase.regionserver.HRegionServer;
44  import org.apache.hadoop.hbase.regionserver.HStore;
45  import org.apache.hadoop.hbase.regionserver.Store;
46  import org.apache.hadoop.hbase.regionserver.StoreEngine;
47  import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
48  import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
49  import org.apache.hadoop.hbase.testclassification.MediumTests;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.JVMClusterUtil;
52  import org.junit.Test;
53  import org.junit.experimental.categories.Category;
54  
55  @Category({ MediumTests.class })
56  public class TestCompactionWithThroughputController {
57  
58    private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
59  
60    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
61  
62    private static final double EPSILON = 1E-6;
63  
64    private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
65  
66    private final byte[] family = Bytes.toBytes("f");
67  
68    private final byte[] qualifier = Bytes.toBytes("q");
69  
70    private Store getStoreWithName(TableName tableName) {
71      MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
72      List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
73      for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
74        HRegionServer hrs = rsts.get(i).getRegionServer();
75        for (HRegion region : hrs.getOnlineRegions(tableName)) {
76          return region.getStores().values().iterator().next();
77        }
78      }
79      return null;
80    }
81  
82    private Store prepareData() throws IOException, InterruptedException {
83      HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
84      if (admin.tableExists(tableName)) {
85        admin.disableTable(tableName);
86        admin.deleteTable(tableName);
87      }
88      HTable table = TEST_UTIL.createTable(tableName, family);
89      Random rand = new Random();
90      for (int i = 0; i < 10; i++) {
91        for (int j = 0; j < 10; j++) {
92          byte[] value = new byte[128 * 1024];
93          rand.nextBytes(value);
94          table.put(new Put(Bytes.toBytes(i * 10 + j)).add(family, qualifier, value));
95        }
96        admin.flush(tableName.getName());
97      }
98      return getStoreWithName(tableName);
99    }
100 
101   private long testCompactionWithThroughputLimit() throws Exception {
102     long throughputLimit = 1024L * 1024;
103     Configuration conf = TEST_UTIL.getConfiguration();
104     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
105     conf.setInt(CompactionConfiguration.MIN_KEY, 100);
106     conf.setInt(CompactionConfiguration.MAX_KEY, 200);
107     conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
108     conf.setLong(
109       PressureAwareCompactionThroughputController
110         .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
111       throughputLimit);
112     conf.setLong(
113       PressureAwareCompactionThroughputController
114         .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
115       throughputLimit);
116     conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
117       PressureAwareCompactionThroughputController.class.getName());
118     TEST_UTIL.startMiniCluster(1);
119     try {
120       Store store = prepareData();
121       assertEquals(10, store.getStorefilesCount());
122       long startTime = System.currentTimeMillis();
123       TEST_UTIL.getHBaseAdmin().majorCompact(tableName.getName());
124       Thread.sleep(5000);
125       assertEquals(10, store.getStorefilesCount());
126       while (store.getStorefilesCount() != 1) {
127         Thread.sleep(20);
128       }
129       long duration = System.currentTimeMillis() - startTime;
130       double throughput = (double) store.getStorefilesSize() / duration * 1000;
131       // confirm that the speed limit work properly(not too fast, and also not too slow)
132       // 20% is the max acceptable error rate.
133       assertTrue(throughput < throughputLimit * 1.2);
134       assertTrue(throughput > throughputLimit * 0.8);
135       return System.currentTimeMillis() - startTime;
136     } finally {
137       TEST_UTIL.shutdownMiniCluster();
138     }
139   }
140 
141   private long testCompactionWithoutThroughputLimit() throws Exception {
142     Configuration conf = TEST_UTIL.getConfiguration();
143     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
144     conf.setInt(CompactionConfiguration.MIN_KEY, 100);
145     conf.setInt(CompactionConfiguration.MAX_KEY, 200);
146     conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
147     conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
148       NoLimitCompactionThroughputController.class.getName());
149     TEST_UTIL.startMiniCluster(1);
150     try {
151       Store store = prepareData();
152       assertEquals(10, store.getStorefilesCount());
153       long startTime = System.currentTimeMillis();
154       TEST_UTIL.getHBaseAdmin().majorCompact(tableName.getName());
155       while (store.getStorefilesCount() != 1) {
156         Thread.sleep(20);
157       }
158       return System.currentTimeMillis() - startTime;
159     } finally {
160       TEST_UTIL.shutdownMiniCluster();
161     }
162   }
163 
164   @Test
165   public void testCompaction() throws Exception {
166     long limitTime = testCompactionWithThroughputLimit();
167     long noLimitTime = testCompactionWithoutThroughputLimit();
168     LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
169         + noLimitTime + "ms");
170     // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
171     // is a very weak assumption.
172     assertTrue(limitTime > noLimitTime * 2);
173   }
174 
175   /**
176    * Test the tuning task of {@link PressureAwareCompactionThroughputController}
177    */
178   @Test
179   public void testThroughputTuning() throws Exception {
180     Configuration conf = TEST_UTIL.getConfiguration();
181     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
182     conf.setLong(
183       PressureAwareCompactionThroughputController
184         .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
185       20L * 1024 * 1024);
186     conf.setLong(
187       PressureAwareCompactionThroughputController
188         .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
189       10L * 1024 * 1024);
190     conf.setInt(CompactionConfiguration.MIN_KEY, 4);
191     conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
192     conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
193       PressureAwareCompactionThroughputController.class.getName());
194     conf.setInt(
195       PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
196       1000);
197     TEST_UTIL.startMiniCluster(1);
198     HConnection conn = HConnectionManager.createConnection(conf);
199     try {
200       HTableDescriptor htd = new HTableDescriptor(tableName);
201       htd.addFamily(new HColumnDescriptor(family));
202       htd.setCompactionEnabled(false);
203       TEST_UTIL.getHBaseAdmin().createTable(htd);
204       TEST_UTIL.waitTableAvailable(tableName.getName());
205       HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
206       PressureAwareCompactionThroughputController throughputController =
207           (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
208               .getCompactionThroughputController();
209       assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
210       HTableInterface table = conn.getTable(tableName);
211       for (int i = 0; i < 5; i++) {
212         table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
213         TEST_UTIL.flush(tableName);
214       }
215       Thread.sleep(2000);
216       assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
217 
218       table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
219       TEST_UTIL.flush(tableName);
220       Thread.sleep(2000);
221       assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
222 
223       table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
224       TEST_UTIL.flush(tableName);
225       Thread.sleep(2000);
226       assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON);
227     } finally {
228       conn.close();
229       TEST_UTIL.shutdownMiniCluster();
230     }
231   }
232 
233   /**
234    * Test the logic that we calculate compaction pressure for a striped store.
235    */
236   @Test
237   public void testGetCompactionPressureForStripedStore() throws Exception {
238     Configuration conf = TEST_UTIL.getConfiguration();
239     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
240     conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
241     conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
242     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
243     conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
244     TEST_UTIL.startMiniCluster(1);
245     HConnection conn = HConnectionManager.createConnection(conf);
246     try {
247       HTableDescriptor htd = new HTableDescriptor(tableName);
248       htd.addFamily(new HColumnDescriptor(family));
249       htd.setCompactionEnabled(false);
250       TEST_UTIL.getHBaseAdmin().createTable(htd);
251       TEST_UTIL.waitTableAvailable(tableName.getName());
252       HStore store = (HStore) getStoreWithName(tableName);
253       assertEquals(0, store.getStorefilesCount());
254       assertEquals(0.0, store.getCompactionPressure(), EPSILON);
255       HTableInterface table = conn.getTable(tableName);
256       for (int i = 0; i < 4; i++) {
257         table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
258         table.put(new Put(Bytes.toBytes(100 + i)).add(family, qualifier, new byte[0]));
259         TEST_UTIL.flush(tableName);
260       }
261       assertEquals(8, store.getStorefilesCount());
262       assertEquals(0.0, store.getCompactionPressure(), EPSILON);
263 
264       table.put(new Put(Bytes.toBytes(4)).add(family, qualifier, new byte[0]));
265       table.put(new Put(Bytes.toBytes(104)).add(family, qualifier, new byte[0]));
266       TEST_UTIL.flush(tableName);
267       assertEquals(10, store.getStorefilesCount());
268       assertEquals(0.5, store.getCompactionPressure(), EPSILON);
269 
270       table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
271       table.put(new Put(Bytes.toBytes(105)).add(family, qualifier, new byte[0]));
272       TEST_UTIL.flush(tableName);
273       assertEquals(12, store.getStorefilesCount());
274       assertEquals(1.0, store.getCompactionPressure(), EPSILON);
275 
276       table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
277       table.put(new Put(Bytes.toBytes(106)).add(family, qualifier, new byte[0]));
278       TEST_UTIL.flush(tableName);
279       assertEquals(14, store.getStorefilesCount());
280       assertEquals(2.0, store.getCompactionPressure(), EPSILON);
281     } finally {
282       conn.close();
283       TEST_UTIL.shutdownMiniCluster();
284     }
285   }
286 }