View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.FileSystem;
23  import org.apache.hadoop.fs.Path;
24  import org.apache.hadoop.hbase.HBaseTestingUtility;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.HRegionInfo;
27  import org.apache.hadoop.hbase.HTableDescriptor;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.testclassification.LargeTests;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.regionserver.wal.HLog;
32  import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
33  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
34  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
35  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hdfs.MiniDFSCluster;
38  import org.junit.After;
39  import org.junit.AfterClass;
40  import org.junit.Before;
41  import org.junit.BeforeClass;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  import org.junit.runner.RunWith;
45  import org.junit.runners.Parameterized;
46  import org.junit.runners.Parameterized.Parameters;
47  
48  import static org.junit.Assert.*;
49  
50  import java.io.IOException;
51  import java.util.ArrayList;
52  import java.util.Collection;
53  import java.util.List;
54  import java.util.concurrent.atomic.AtomicLong;
55  
56  @Category(LargeTests.class)
57  @RunWith(Parameterized.class)
58  public class TestReplicationHLogReaderManager {
59  
60    private static HBaseTestingUtility TEST_UTIL;
61    private static Configuration conf;
62    private static Path hbaseDir;
63    private static FileSystem fs;
64    private static MiniDFSCluster cluster;
65    private static final TableName tableName = TableName.valueOf("tablename");
66    private static final byte [] family = Bytes.toBytes("column");
67    private static final byte [] qualifier = Bytes.toBytes("qualifier");
68    private static final HRegionInfo info = new HRegionInfo(tableName,
69        HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
70    private static final HTableDescriptor htd = new HTableDescriptor(tableName);
71  
72    private HLog log;
73    private ReplicationHLogReaderManager logManager;
74    private PathWatcher pathWatcher;
75    private int nbRows;
76    private int walEditKVs;
77    private final AtomicLong sequenceId = new AtomicLong(1);
78  
79    @Parameters
80    public static Collection<Object[]> parameters() {
81      // Try out different combinations of row count and KeyValue count
82      int[] NB_ROWS = { 1500, 60000 };
83      int[] NB_KVS = { 1, 100 };
84      // whether compression is used
85      Boolean[] BOOL_VALS = { false, true };
86      List<Object[]> parameters = new ArrayList<Object[]>();
87      for (int nbRows : NB_ROWS) {
88        for (int walEditKVs : NB_KVS) {
89          for (boolean b : BOOL_VALS) {
90            Object[] arr = new Object[3];
91            arr[0] = nbRows;
92            arr[1] = walEditKVs;
93            arr[2] = b;
94            parameters.add(arr);
95          }
96        }
97      }
98      return parameters;
99    }
100 
101   public TestReplicationHLogReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
102     this.nbRows = nbRows;
103     this.walEditKVs = walEditKVs;
104     TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
105       enableCompression);
106   }
107   
108   @BeforeClass
109   public static void setUpBeforeClass() throws Exception {
110     TEST_UTIL = new HBaseTestingUtility();
111     conf = TEST_UTIL.getConfiguration();
112     TEST_UTIL.startMiniDFSCluster(3);
113 
114     hbaseDir = TEST_UTIL.createRootDir();
115     cluster = TEST_UTIL.getDFSCluster();
116     fs = cluster.getFileSystem();
117   }
118 
119   @AfterClass
120   public static void tearDownAfterClass() throws Exception {
121     TEST_UTIL.shutdownMiniCluster();
122   }
123 
124   @Before
125   public void setUp() throws Exception {
126     logManager = new ReplicationHLogReaderManager(fs, conf);
127     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
128     pathWatcher = new PathWatcher();
129     listeners.add(pathWatcher);
130     log = HLogFactory.createHLog(fs, hbaseDir, "test", conf, listeners, "some server");
131   }
132 
133   @After
134   public void tearDown() throws Exception {
135     log.closeAndDelete();
136   }
137 
138   @Test
139   public void test() throws Exception {
140     // Grab the path that was generated when the log rolled as part of its creation
141     Path path = pathWatcher.currentPath;
142 
143     assertEquals(0, logManager.getPosition());
144 
145     appendToLog();
146 
147     // There's one edit in the log, read it. Reading past it needs to return nulls
148     assertNotNull(logManager.openReader(path));
149     logManager.seek();
150     HLog.Entry entry = logManager.readNextAndSetPosition();
151     assertNotNull(entry);
152     entry = logManager.readNextAndSetPosition();
153     assertNull(entry);
154     logManager.closeReader();
155     long oldPos = logManager.getPosition();
156 
157     appendToLog();
158 
159     // Read the newly added entry, make sure we made progress
160     assertNotNull(logManager.openReader(path));
161     logManager.seek();
162     entry = logManager.readNextAndSetPosition();
163     assertNotEquals(oldPos, logManager.getPosition());
164     assertNotNull(entry);
165     logManager.closeReader();
166     oldPos = logManager.getPosition();
167 
168     log.rollWriter();
169 
170     // We rolled but we still should see the end of the first log and not get data
171     assertNotNull(logManager.openReader(path));
172     logManager.seek();
173     entry = logManager.readNextAndSetPosition();
174     assertEquals(oldPos, logManager.getPosition());
175     assertNull(entry);
176     logManager.finishCurrentFile();
177 
178     path = pathWatcher.currentPath;
179 
180     for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
181     log.rollWriter();
182     logManager.openReader(path);
183     logManager.seek();
184     for (int i = 0; i < nbRows; i++) {
185       HLog.Entry e = logManager.readNextAndSetPosition();
186       if (e == null) {
187         fail("Should have enough entries");
188       }
189     }
190   }
191 
192   private void appendToLog() throws IOException {
193     appendToLogPlus(1);
194   }
195 
196   private void appendToLogPlus(int count) throws IOException {
197     log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd, sequenceId);
198   }
199 
200   private WALEdit getWALEdits(int count) {
201     WALEdit edit = new WALEdit();
202     for (int i = 0; i < count; i++) {
203       edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
204         System.currentTimeMillis(), qualifier));
205     }
206     return edit;
207   }
208 
209   class PathWatcher implements WALActionsListener {
210 
211     Path currentPath;
212 
213     @Override
214     public void preLogRoll(Path oldPath, Path newPath) throws IOException {
215       currentPath = newPath;
216     }
217 
218     @Override
219     public void postLogRoll(Path oldPath, Path newPath) throws IOException {}
220 
221     @Override
222     public void preLogArchive(Path oldPath, Path newPath) throws IOException {}
223 
224     @Override
225     public void postLogArchive(Path oldPath, Path newPath) throws IOException {}
226 
227     @Override
228     public void logRollRequested(boolean tooFewReplicas) {}
229 
230     @Override
231     public void logCloseRequested() {}
232 
233     @Override
234     public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {}
235 
236     @Override
237     public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {}
238   }
239 }