View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
22  import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
23  
24  import java.io.IOException;
25  import java.io.PrintWriter;
26  import java.io.StringWriter;
27  import java.util.Arrays;
28  import java.util.HashSet;
29  import java.util.Map;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.math.RandomUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.client.Append;
41  import org.apache.hadoop.hbase.client.Delete;
42  import org.apache.hadoop.hbase.client.Get;
43  import org.apache.hadoop.hbase.client.HTable;
44  import org.apache.hadoop.hbase.client.Increment;
45  import org.apache.hadoop.hbase.client.Mutation;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
49  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
50  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
51  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
52  import org.apache.hadoop.util.StringUtils;
53  
54  import com.google.common.base.Preconditions;
55  
56  /** Creates multiple threads that write key/values into the */
57  public class MultiThreadedUpdater extends MultiThreadedWriterBase {
58    private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class);
59  
60    protected Set<HBaseUpdaterThread> updaters = new HashSet<HBaseUpdaterThread>();
61  
62    private MultiThreadedWriterBase writer = null;
63    private boolean isBatchUpdate = false;
64    private boolean ignoreNonceConflicts = false;
65    private final double updatePercent;
66  
67    public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
68        TableName tableName, double updatePercent) {
69      super(dataGen, conf, tableName, "U");
70      this.updatePercent = updatePercent;
71    }
72  
73    /** Use batch vs. separate updates for every column in a row */
74    public void setBatchUpdate(boolean isBatchUpdate) {
75      this.isBatchUpdate = isBatchUpdate;
76    }
77  
78    public void linkToWriter(MultiThreadedWriterBase writer) {
79      this.writer = writer;
80      writer.setTrackWroteKeys(true);
81    }
82  
83    @Override
84    public void start(long startKey, long endKey, int numThreads) throws IOException {
85      super.start(startKey, endKey, numThreads);
86  
87      if (verbose) {
88        LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
89      }
90  
91      addUpdaterThreads(numThreads);
92  
93      startThreads(updaters);
94    }
95  
96    protected void addUpdaterThreads(int numThreads) throws IOException {
97      for (int i = 0; i < numThreads; ++i) {
98        HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
99        updaters.add(updater);
100     }
101   }
102 
103   private long getNextKeyToUpdate() {
104     if (writer == null) {
105       return nextKeyToWrite.getAndIncrement();
106     }
107     synchronized (this) {
108       if (nextKeyToWrite.get() >= endKey) {
109         // Finished the whole key range
110         return endKey;
111       }
112       while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
113         Threads.sleepWithoutInterrupt(100);
114       }
115       long k = nextKeyToWrite.getAndIncrement();
116       if (writer.failedToWriteKey(k)) {
117         failedKeySet.add(k);
118         return getNextKeyToUpdate();
119       }
120       return k;
121     }
122   }
123 
124   protected class HBaseUpdaterThread extends Thread {
125     protected final HTable table;
126 
127     public HBaseUpdaterThread(int updaterId) throws IOException {
128       setName(getClass().getSimpleName() + "_" + updaterId);
129       table = createTable();
130     }
131 
132     protected HTable createTable() throws IOException {
133       return new HTable(conf, tableName);
134     }
135 
136     public void run() {
137       try {
138         long rowKeyBase;
139         StringBuilder buf = new StringBuilder();
140         byte[][] columnFamilies = dataGenerator.getColumnFamilies();
141         while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
142           if (RandomUtils.nextInt(100) < updatePercent) {
143             byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
144             Increment inc = new Increment(rowKey);
145             Append app = new Append(rowKey);
146             numKeys.addAndGet(1);
147             int columnCount = 0;
148             for (byte[] cf : columnFamilies) {
149               long cfHash = Arrays.hashCode(cf);
150               inc.addColumn(cf, INCREMENT, cfHash);
151               buf.setLength(0); // Clear the buffer
152               buf.append("#").append(Bytes.toString(INCREMENT));
153               buf.append(":").append(MutationType.INCREMENT.getNumber());
154               app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
155               ++columnCount;
156               if (!isBatchUpdate) {
157                 mutate(table, inc, rowKeyBase);
158                 numCols.addAndGet(1);
159                 inc = new Increment(rowKey);
160                 mutate(table, app, rowKeyBase);
161                 numCols.addAndGet(1);
162                 app = new Append(rowKey);
163               }
164               Get get = new Get(rowKey);
165               get.addFamily(cf);
166               try {
167                 get = dataGenerator.beforeGet(rowKeyBase, get);
168               } catch (Exception e) {
169                 // Ideally wont happen
170                 LOG.warn("Failed to modify the get from the load generator  = [" + get.getRow()
171                     + "], column family = [" + Bytes.toString(cf) + "]", e);
172               }
173               Result result = getRow(get, rowKeyBase, cf);
174               Map<byte[], byte[]> columnValues =
175                 result != null ? result.getFamilyMap(cf) : null;
176               if (columnValues == null) {
177                 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
178                 if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) {
179                   LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey));
180                 } else {
181                   failedKeySet.add(rowKeyBase);
182                   LOG.error("Failed to update the row with key = [" + rowKey
183                       + "], since we could not get the original row");
184                 }
185               }
186               if(columnValues != null) {
187                 for (byte[] column : columnValues.keySet()) {
188                   if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) {
189                     continue;
190                   }
191                   MutationType mt = MutationType
192                       .valueOf(RandomUtils.nextInt(MutationType.values().length));
193                   long columnHash = Arrays.hashCode(column);
194                   long hashCode = cfHash + columnHash;
195                   byte[] hashCodeBytes = Bytes.toBytes(hashCode);
196                   byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
197                   if (hashCode % 2 == 0) {
198                     Cell kv = result.getColumnLatestCell(cf, column);
199                     checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
200                     Preconditions.checkNotNull(checkedValue,
201                         "Column value to be checked should not be null");
202                   }
203                   buf.setLength(0); // Clear the buffer
204                   buf.append("#").append(Bytes.toString(column)).append(":");
205                   ++columnCount;
206                   switch (mt) {
207                   case PUT:
208                     Put put = new Put(rowKey);
209                     put.add(cf, column, hashCodeBytes);
210                     mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
211                     buf.append(MutationType.PUT.getNumber());
212                     break;
213                   case DELETE:
214                     Delete delete = new Delete(rowKey);
215                     // Delete all versions since a put
216                     // could be called multiple times if CM is used
217                     delete.deleteColumns(cf, column);
218                     mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
219                     buf.append(MutationType.DELETE.getNumber());
220                     break;
221                   default:
222                     buf.append(MutationType.APPEND.getNumber());
223                     app.add(cf, column, hashCodeBytes);
224                   }
225                   app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
226                   if (!isBatchUpdate) {
227                     mutate(table, app, rowKeyBase);
228                     numCols.addAndGet(1);
229                     app = new Append(rowKey);
230                   }
231                 }
232               }
233             }
234             if (isBatchUpdate) {
235               if (verbose) {
236                 LOG.debug("Preparing increment and append for key = ["
237                   + rowKey + "], " + columnCount + " columns");
238               }
239               mutate(table, inc, rowKeyBase);
240               mutate(table, app, rowKeyBase);
241               numCols.addAndGet(columnCount);
242             }
243           }
244           if (trackWroteKeys) {
245             wroteKeys.add(rowKeyBase);
246           }
247         }
248       } finally {
249         closeHTable();
250         numThreadsWorking.decrementAndGet();
251       }
252     }
253 
254     protected void closeHTable() {
255       try {
256         if (table != null) {
257           table.close();
258         }
259       } catch (IOException e) {
260         LOG.error("Error closing table", e);
261       }
262     }
263 
264     protected Result getRow(Get get, long rowKeyBase, byte[] cf) {
265       Result result = null;
266       try {
267         result = table.get(get);
268       } catch (IOException ie) {
269         LOG.warn(
270             "Failed to get the row for key = [" + get.getRow() + "], column family = ["
271                 + Bytes.toString(cf) + "]", ie);
272       }
273       return result;
274     }
275 
276     public void mutate(HTable table, Mutation m, long keyBase) {
277       mutate(table, m, keyBase, null, null, null, null);
278     }
279 
280     public void mutate(HTable table, Mutation m,
281         long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
282       long start = System.currentTimeMillis();
283       try {
284         m = dataGenerator.beforeMutate(keyBase, m);
285         if (m instanceof Increment) {
286           table.increment((Increment)m);
287         } else if (m instanceof Append) {
288           table.append((Append)m);
289         } else if (m instanceof Put) {
290           table.checkAndPut(row, cf, q, v, (Put)m);
291         } else if (m instanceof Delete) {
292           table.checkAndDelete(row, cf, q, v, (Delete)m);
293         } else {
294           throw new IllegalArgumentException(
295             "unsupported mutation " + m.getClass().getSimpleName());
296         }
297         totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
298       } catch (IOException e) {
299         if (ignoreNonceConflicts && (e instanceof OperationConflictException)) {
300           LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
301 	  totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
302           return;
303         }
304         failedKeySet.add(keyBase);
305         String exceptionInfo;
306         if (e instanceof RetriesExhaustedWithDetailsException) {
307           RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
308           exceptionInfo = aggEx.getExhaustiveDescription();
309         } else {
310           exceptionInfo = StringUtils.stringifyException(e);
311         }
312         LOG.error("Failed to mutate: " + keyBase + " after " +
313             (System.currentTimeMillis() - start) +
314           "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
315             + exceptionInfo);
316       }
317     }
318   }
319 
320   @Override
321   public void waitForFinish() {
322     super.waitForFinish();
323     System.out.println("Failed to update keys: " + failedKeySet.size());
324     for (Long key : failedKeySet) {
325        System.out.println("Failed to update key: " + key);
326     }
327   }
328 
329   public void mutate(HTable table, Mutation m, long keyBase) {
330     mutate(table, m, keyBase, null, null, null, null);
331   }
332 
333   public void mutate(HTable table, Mutation m,
334       long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
335     long start = System.currentTimeMillis();
336     try {
337       m = dataGenerator.beforeMutate(keyBase, m);
338       if (m instanceof Increment) {
339         table.increment((Increment)m);
340       } else if (m instanceof Append) {
341         table.append((Append)m);
342       } else if (m instanceof Put) {
343         table.checkAndPut(row, cf, q, v, (Put)m);
344       } else if (m instanceof Delete) {
345         table.checkAndDelete(row, cf, q, v, (Delete)m);
346       } else {
347         throw new IllegalArgumentException(
348           "unsupported mutation " + m.getClass().getSimpleName());
349       }
350       totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
351     } catch (IOException e) {
352       failedKeySet.add(keyBase);
353       String exceptionInfo;
354       if (e instanceof RetriesExhaustedWithDetailsException) {
355         RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
356         exceptionInfo = aggEx.getExhaustiveDescription();
357       } else {
358         StringWriter stackWriter = new StringWriter();
359         PrintWriter pw = new PrintWriter(stackWriter);
360         e.printStackTrace(pw);
361         pw.flush();
362         exceptionInfo = StringUtils.stringifyException(e);
363       }
364       LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
365         "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
366           + exceptionInfo);
367     }
368   }
369 
370   public void setIgnoreNonceConflicts(boolean value) {
371     this.ignoreNonceConflicts = value;
372   }
373 }