View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static java.lang.String.format;
22  
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.nio.ByteBuffer;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.Collection;
30  import java.util.Deque;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Map.Entry;
38  import java.util.Set;
39  import java.util.TreeMap;
40  import java.util.UUID;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.ExecutionException;
43  import java.util.concurrent.ExecutorService;
44  import java.util.concurrent.Future;
45  import java.util.concurrent.LinkedBlockingQueue;
46  import java.util.concurrent.ThreadPoolExecutor;
47  import java.util.concurrent.TimeUnit;
48  
49  import org.apache.commons.lang.mutable.MutableInt;
50  import org.apache.commons.logging.Log;
51  import org.apache.commons.logging.LogFactory;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.conf.Configured;
54  import org.apache.hadoop.fs.FileStatus;
55  import org.apache.hadoop.fs.FileSystem;
56  import org.apache.hadoop.fs.Path;
57  import org.apache.hadoop.fs.permission.FsPermission;
58  import org.apache.hadoop.hbase.HBaseConfiguration;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HTableDescriptor;
62  import org.apache.hadoop.hbase.KeyValue;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.classification.InterfaceAudience;
66  import org.apache.hadoop.hbase.classification.InterfaceStability;
67  import org.apache.hadoop.hbase.client.HBaseAdmin;
68  import org.apache.hadoop.hbase.client.HConnection;
69  import org.apache.hadoop.hbase.client.HTable;
70  import org.apache.hadoop.hbase.client.RegionServerCallable;
71  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
72  import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
73  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
74  import org.apache.hadoop.hbase.io.HFileLink;
75  import org.apache.hadoop.hbase.io.HalfStoreFileReader;
76  import org.apache.hadoop.hbase.io.Reference;
77  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
78  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
79  import org.apache.hadoop.hbase.io.hfile.HFile;
80  import org.apache.hadoop.hbase.io.hfile.HFileContext;
81  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
82  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
83  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
84  import org.apache.hadoop.hbase.regionserver.BloomType;
85  import org.apache.hadoop.hbase.regionserver.HStore;
86  import org.apache.hadoop.hbase.regionserver.StoreFile;
87  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
88  import org.apache.hadoop.hbase.security.UserProvider;
89  import org.apache.hadoop.hbase.security.token.FsDelegationToken;
90  import org.apache.hadoop.hbase.util.Bytes;
91  import org.apache.hadoop.hbase.util.FSHDFSUtils;
92  import org.apache.hadoop.hbase.util.Pair;
93  import org.apache.hadoop.util.Tool;
94  import org.apache.hadoop.util.ToolRunner;
95  
96  import com.google.common.collect.HashMultimap;
97  import com.google.common.collect.Multimap;
98  import com.google.common.collect.Multimaps;
99  import com.google.common.util.concurrent.ThreadFactoryBuilder;
100 
101 /**
102  * Tool to load the output of HFileOutputFormat into an existing table.
103  * @see #usage()
104  */
105 @InterfaceAudience.Public
106 @InterfaceStability.Stable
107 public class LoadIncrementalHFiles extends Configured implements Tool {
108   private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
109   private HBaseAdmin hbAdmin;
110 
111   public static final String NAME = "completebulkload";
112   public static final String MAX_FILES_PER_REGION_PER_FAMILY
113     = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
114   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
115   public final static String CREATE_TABLE_CONF_KEY = "create.table";
116 
117   private int maxFilesPerRegionPerFamily;
118   private boolean assignSeqIds;
119 
120   // Source filesystem
121   private FileSystem fs;
122   // Source delegation token
123   private FsDelegationToken fsDelegationToken;
124   private String bulkToken;
125   private UserProvider userProvider;
126 
127   private LoadIncrementalHFiles() {}
128 
129   public LoadIncrementalHFiles(Configuration conf) throws Exception {
130     super(conf);
131     initialize();
132   }
133 
134   private void initialize() throws Exception {
135     if (hbAdmin == null) {
136       // make a copy, just to be sure we're not overriding someone else's config
137       setConf(HBaseConfiguration.create(getConf()));
138       Configuration conf = getConf();
139       // disable blockcache for tool invocation, see HBASE-10500
140       conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
141       this.hbAdmin = new HBaseAdmin(conf);
142       this.userProvider = UserProvider.instantiate(conf);
143       this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
144       assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
145       maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
146     }
147   }
148 
149   private void usage() {
150     System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
151         + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
152         + "  Note: if you set this to 'no', then the target table must already exist in HBase\n"
153         + "\n");
154   }
155 
156   private static interface BulkHFileVisitor<TFamily> {
157     TFamily bulkFamily(final byte[] familyName)
158       throws IOException;
159     void bulkHFile(final TFamily family, final FileStatus hfileStatus)
160       throws IOException;
161   }
162 
163   /**
164    * Iterate over the bulkDir hfiles.
165    * Skip reference, HFileLink, files starting with "_" and non-valid hfiles.
166    */
167   private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
168     final BulkHFileVisitor<TFamily> visitor) throws IOException {
169     if (!fs.exists(bulkDir)) {
170       throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
171     }
172 
173     FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
174     if (familyDirStatuses == null) {
175       throw new FileNotFoundException("No families found in " + bulkDir);
176     }
177 
178     for (FileStatus familyStat : familyDirStatuses) {
179       if (!familyStat.isDir()) {
180         LOG.warn("Skipping non-directory " + familyStat.getPath());
181         continue;
182       }
183       Path familyDir = familyStat.getPath();
184       byte[] familyName = familyDir.getName().getBytes();
185       TFamily family = visitor.bulkFamily(familyName);
186 
187       FileStatus[] hfileStatuses = fs.listStatus(familyDir);
188       for (FileStatus hfileStatus : hfileStatuses) {
189         if (!fs.isFile(hfileStatus.getPath())) {
190           LOG.warn("Skipping non-file " + hfileStatus);
191           continue;
192         }
193 
194         Path hfile = hfileStatus.getPath();
195         // Skip "_", reference, HFileLink
196         String fileName = hfile.getName();
197         if (fileName.startsWith("_")) {
198           continue;
199         }
200         if (StoreFileInfo.isReference(fileName)) {
201           LOG.warn("Skipping reference " + fileName);
202           continue;
203         }
204         if (HFileLink.isHFileLink(fileName)) {
205           LOG.warn("Skipping HFileLink " + fileName);
206           continue;
207         }
208 
209         // Validate HFile Format
210         try {
211           if (!HFile.isHFileFormat(fs, hfile)) {
212             LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
213             continue;
214           }
215         } catch (FileNotFoundException e) {
216           LOG.warn("the file " + hfile + " was removed");
217           continue;
218         }
219 
220         visitor.bulkHFile(family, hfileStatus);
221       }
222     }
223   }
224 
225   /**
226    * Represents an HFile waiting to be loaded. An queue is used
227    * in this class in order to support the case where a region has
228    * split during the process of the load. When this happens,
229    * the HFile is split into two physical parts across the new
230    * region boundary, and each part is added back into the queue.
231    * The import process finishes when the queue is empty.
232    */
233   static class LoadQueueItem {
234     final byte[] family;
235     final Path hfilePath;
236 
237     public LoadQueueItem(byte[] family, Path hfilePath) {
238       this.family = family;
239       this.hfilePath = hfilePath;
240     }
241 
242     public String toString() {
243       return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
244     }
245   }
246 
247   /**
248    * Walk the given directory for all HFiles, and return a Queue
249    * containing all such files.
250    */
251   private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir)
252   throws IOException {
253     fs = hfofDir.getFileSystem(getConf());
254     visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
255       @Override
256       public byte[] bulkFamily(final byte[] familyName) {
257         return familyName;
258       }
259       @Override
260       public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
261         long length = hfile.getLen();
262         if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
263             HConstants.DEFAULT_MAX_FILE_SIZE)) {
264           LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
265               length + " bytes can be problematic as it may lead to oversplitting.");
266         }
267         ret.add(new LoadQueueItem(family, hfile.getPath()));
268       }
269     });
270   }
271 
272   /**
273    * Perform a bulk load of the given directory into the given
274    * pre-existing table.  This method is not threadsafe.
275    *
276    * @param hfofDir the directory that was provided as the output path
277    * of a job using HFileOutputFormat
278    * @param table the table to load into
279    * @throws TableNotFoundException if table does not yet exist
280    */
281   @SuppressWarnings("deprecation")
282   public void doBulkLoad(Path hfofDir, final HTable table)
283     throws TableNotFoundException, IOException
284   {
285     final HConnection conn = table.getConnection();
286 
287     if (!conn.isTableAvailable(table.getName())) {
288       throw new TableNotFoundException("Table " +
289           Bytes.toStringBinary(table.getTableName()) +
290           "is not currently available.");
291     }
292 
293     // initialize thread pools
294     int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
295       Runtime.getRuntime().availableProcessors());
296     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
297     builder.setNameFormat("LoadIncrementalHFiles-%1$d");
298     ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
299         60, TimeUnit.SECONDS,
300         new LinkedBlockingQueue<Runnable>(),
301         builder.build());
302     ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
303 
304     // LQI queue does not need to be threadsafe -- all operations on this queue
305     // happen in this thread
306     Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
307     try {
308       discoverLoadQueue(queue, hfofDir);
309       // check whether there is invalid family name in HFiles to be bulkloaded
310       Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
311       ArrayList<String> familyNames = new ArrayList<String>(families.size());
312       for (HColumnDescriptor family : families) {
313         familyNames.add(family.getNameAsString());
314       }
315       ArrayList<String> unmatchedFamilies = new ArrayList<String>();
316       Iterator<LoadQueueItem> queueIter = queue.iterator();
317       while (queueIter.hasNext()) {
318         LoadQueueItem lqi = queueIter.next();
319         String familyNameInHFile = Bytes.toString(lqi.family);
320         if (!familyNames.contains(familyNameInHFile)) {
321           unmatchedFamilies.add(familyNameInHFile);
322         }
323       }
324       if (unmatchedFamilies.size() > 0) {
325         String msg =
326             "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
327                 + unmatchedFamilies + "; valid family names of table "
328                 + Bytes.toString(table.getTableName()) + " are: " + familyNames;
329         LOG.error(msg);
330         throw new IOException(msg);
331       }
332       int count = 0;
333 
334       if (queue.isEmpty()) {
335         LOG.warn("Bulk load operation did not find any files to load in " +
336             "directory " + hfofDir.toUri() + ".  Does it contain files in " +
337             "subdirectories that correspond to column family names?");
338         return;
339       }
340 
341       //If using secure bulk load, get source delegation token, and
342       //prepare staging directory and token
343       // fs is the source filesystem
344       fsDelegationToken.acquireDelegationToken(fs);
345       if(isSecureBulkLoadEndpointAvailable()) {
346         bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
347       }
348 
349       // Assumes that region splits can happen while this occurs.
350       while (!queue.isEmpty()) {
351         // need to reload split keys each iteration.
352         final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
353         if (count != 0) {
354           LOG.info("Split occured while grouping HFiles, retry attempt " +
355               + count + " with " + queue.size() + " files remaining to group or split");
356         }
357 
358         int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 0);
359         if (maxRetries != 0 && count >= maxRetries) {
360           throw new IOException("Retry attempted " + count +
361             " times without completing, bailing out");
362         }
363         count++;
364 
365         // Using ByteBuffer for byte[] equality semantics
366         Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
367             pool, queue, startEndKeys);
368 
369         if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
370           // Error is logged inside checkHFilesCountPerRegionPerFamily.
371           throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
372             + " hfiles to one family of one region");
373         }
374 
375         bulkLoadPhase(table, conn, pool, queue, regionGroups);
376 
377         // NOTE: The next iteration's split / group could happen in parallel to
378         // atomic bulkloads assuming that there are splits and no merges, and
379         // that we can atomically pull out the groups we want to retry.
380       }
381 
382     } finally {
383       fsDelegationToken.releaseDelegationToken();
384       if(bulkToken != null) {
385         new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
386       }
387       pool.shutdown();
388       if (queue != null && !queue.isEmpty()) {
389         StringBuilder err = new StringBuilder();
390         err.append("-------------------------------------------------\n");
391         err.append("Bulk load aborted with some files not yet loaded:\n");
392         err.append("-------------------------------------------------\n");
393         for (LoadQueueItem q : queue) {
394           err.append("  ").append(q.hfilePath).append('\n');
395         }
396         LOG.error(err);
397       }
398     }
399 
400     if (queue != null && !queue.isEmpty()) {
401         throw new RuntimeException("Bulk load aborted with some files not yet loaded."
402           + "Please check log for more details.");
403     }
404   }
405 
406   /**
407    * This takes the LQI's grouped by likely regions and attempts to bulk load
408    * them.  Any failures are re-queued for another pass with the
409    * groupOrSplitPhase.
410    */
411   protected void bulkLoadPhase(final HTable table, final HConnection conn,
412       ExecutorService pool, Deque<LoadQueueItem> queue,
413       final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
414     // atomically bulk load the groups.
415     Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
416     for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
417       final byte[] first = e.getKey().array();
418       final Collection<LoadQueueItem> lqis =  e.getValue();
419 
420       final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
421         public List<LoadQueueItem> call() throws Exception {
422           List<LoadQueueItem> toRetry =
423               tryAtomicRegionLoad(conn, table.getName(), first, lqis);
424           return toRetry;
425         }
426       };
427       loadingFutures.add(pool.submit(call));
428     }
429 
430     // get all the results.
431     for (Future<List<LoadQueueItem>> future : loadingFutures) {
432       try {
433         List<LoadQueueItem> toRetry = future.get();
434 
435         // LQIs that are requeued to be regrouped.
436         queue.addAll(toRetry);
437 
438       } catch (ExecutionException e1) {
439         Throwable t = e1.getCause();
440         if (t instanceof IOException) {
441           // At this point something unrecoverable has happened.
442           // TODO Implement bulk load recovery
443           throw new IOException("BulkLoad encountered an unrecoverable problem", t);
444         }
445         LOG.error("Unexpected execution exception during bulk load", e1);
446         throw new IllegalStateException(t);
447       } catch (InterruptedException e1) {
448         LOG.error("Unexpected interrupted exception during bulk load", e1);
449         throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
450       }
451     }
452   }
453 
454   private boolean checkHFilesCountPerRegionPerFamily(
455       final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
456     for (Entry<ByteBuffer,
457         ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
458       final Collection<LoadQueueItem> lqis =  e.getValue();
459       HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
460       for (LoadQueueItem lqi: lqis) {
461         MutableInt count = filesMap.get(lqi.family);
462         if (count == null) {
463           count = new MutableInt();
464           filesMap.put(lqi.family, count);
465         }
466         count.increment();
467         if (count.intValue() > maxFilesPerRegionPerFamily) {
468           LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
469             + " hfiles to family " + Bytes.toStringBinary(lqi.family)
470             + " of region with start key "
471             + Bytes.toStringBinary(e.getKey()));
472           return false;
473         }
474       }
475     }
476     return true;
477   }
478 
479   /**
480    * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
481    * bulk load region targets.
482    */
483   private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table,
484       ExecutorService pool, Deque<LoadQueueItem> queue,
485       final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
486     // <region start key, LQI> need synchronized only within this scope of this
487     // phase because of the puts that happen in futures.
488     Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
489     final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
490 
491     // drain LQIs and figure out bulk load groups
492     Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
493     while (!queue.isEmpty()) {
494       final LoadQueueItem item = queue.remove();
495 
496       final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
497         public List<LoadQueueItem> call() throws Exception {
498           List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
499           return splits;
500         }
501       };
502       splittingFutures.add(pool.submit(call));
503     }
504     // get all the results.  All grouping and splitting must finish before
505     // we can attempt the atomic loads.
506     for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
507       try {
508         List<LoadQueueItem> splits = lqis.get();
509         if (splits != null) {
510           queue.addAll(splits);
511         }
512       } catch (ExecutionException e1) {
513         Throwable t = e1.getCause();
514         if (t instanceof IOException) {
515           LOG.error("IOException during splitting", e1);
516           throw (IOException)t; // would have been thrown if not parallelized,
517         }
518         LOG.error("Unexpected execution exception during splitting", e1);
519         throw new IllegalStateException(t);
520       } catch (InterruptedException e1) {
521         LOG.error("Unexpected interrupted exception during splitting", e1);
522         throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
523       }
524     }
525     return regionGroups;
526   }
527 
528   // unique file name for the table
529   private String getUniqueName() {
530     return UUID.randomUUID().toString().replaceAll("-", "");
531   }
532 
533   protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
534       final HTable table, byte[] startKey,
535       byte[] splitKey) throws IOException {
536     final Path hfilePath = item.hfilePath;
537 
538     // We use a '_' prefix which is ignored when walking directory trees
539     // above.
540     final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
541 
542     LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
543     "region. Splitting...");
544 
545     String uniqueName = getUniqueName();
546     HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
547     Path botOut = new Path(tmpDir, uniqueName + ".bottom");
548     Path topOut = new Path(tmpDir, uniqueName + ".top");
549     splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
550         botOut, topOut);
551 
552     FileSystem fs = tmpDir.getFileSystem(getConf());
553     fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
554     fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
555 
556     // Add these back at the *front* of the queue, so there's a lower
557     // chance that the region will just split again before we get there.
558     List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
559     lqis.add(new LoadQueueItem(item.family, botOut));
560     lqis.add(new LoadQueueItem(item.family, topOut));
561 
562     LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
563     return lqis;
564   }
565 
566   /**
567    * Attempt to assign the given load queue item into its target region group.
568    * If the hfile boundary no longer fits into a region, physically splits
569    * the hfile such that the new bottom half will fit and returns the list of
570    * LQI's corresponding to the resultant hfiles.
571    *
572    * protected for testing
573    * @throws IOException
574    */
575   protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
576       final LoadQueueItem item, final HTable table,
577       final Pair<byte[][], byte[][]> startEndKeys)
578       throws IOException {
579     final Path hfilePath = item.hfilePath;
580     HFile.Reader hfr = HFile.createReader(fs, hfilePath,
581         new CacheConfig(getConf()), getConf());
582     final byte[] first, last;
583     try {
584       hfr.loadFileInfo();
585       first = hfr.getFirstRowKey();
586       last = hfr.getLastRowKey();
587     }  finally {
588       hfr.close();
589     }
590 
591     LOG.info("Trying to load hfile=" + hfilePath +
592         " first=" + Bytes.toStringBinary(first) +
593         " last="  + Bytes.toStringBinary(last));
594     if (first == null || last == null) {
595       assert first == null && last == null;
596       // TODO what if this is due to a bad HFile?
597       LOG.info("hfile " + hfilePath + " has no entries, skipping");
598       return null;
599     }
600     if (Bytes.compareTo(first, last) > 0) {
601       throw new IllegalArgumentException(
602       "Invalid range: " + Bytes.toStringBinary(first) +
603       " > " + Bytes.toStringBinary(last));
604     }
605     int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
606         Bytes.BYTES_COMPARATOR);
607     if (idx < 0) {
608       // not on boundary, returns -(insertion index).  Calculate region it
609       // would be in.
610       idx = -(idx + 1) - 1;
611     }
612     final int indexForCallable = idx;
613 
614     /**
615      * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
616      * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
617      * region. 3) if the endkey of the last region is not empty.
618      */
619     if (indexForCallable < 0) {
620       throw new IOException("The first region info for table "
621           + Bytes.toString(table.getTableName())
622           + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
623     } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
624         && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
625       throw new IOException("The last region info for table "
626           + Bytes.toString(table.getTableName())
627           + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
628     } else if (indexForCallable + 1 < startEndKeys.getFirst().length
629         && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
630           startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
631       throw new IOException("The endkey of one region for table "
632           + Bytes.toString(table.getTableName())
633           + " is not equal to the startkey of the next region in hbase:meta."
634           + "Please use hbck tool to fix it first.");
635     }
636 
637     boolean lastKeyInRange =
638       Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
639       Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
640     if (!lastKeyInRange) {
641       List<LoadQueueItem> lqis = splitStoreFile(item, table,
642           startEndKeys.getFirst()[indexForCallable],
643           startEndKeys.getSecond()[indexForCallable]);
644       return lqis;
645     }
646 
647     // group regions.
648     regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
649     return null;
650   }
651 
652   /**
653    * @deprecated Use {@link #tryAtomicRegionLoad(HConnection, TableName, byte[], Collection)}
654    */
655   @Deprecated
656   protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
657       final byte [] tableName, final byte[] first, Collection<LoadQueueItem> lqis)
658   throws IOException {
659     return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis);
660   }
661 
662   /**
663    * Attempts to do an atomic load of many hfiles into a region.  If it fails,
664    * it returns a list of hfiles that need to be retried.  If it is successful
665    * it will return an empty list.
666    *
667    * NOTE: To maintain row atomicity guarantees, region server callable should
668    * succeed atomically and fails atomically.
669    *
670    * Protected for testing.
671    *
672    * @return empty list if success, list of items to retry on recoverable
673    * failure
674    */
675   protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
676       final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
677   throws IOException {
678     final List<Pair<byte[], String>> famPaths =
679       new ArrayList<Pair<byte[], String>>(lqis.size());
680     for (LoadQueueItem lqi : lqis) {
681       famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
682     }
683 
684     final RegionServerCallable<Boolean> svrCallable =
685         new RegionServerCallable<Boolean>(conn, tableName, first) {
686       @Override
687       public Boolean call() throws Exception {
688         SecureBulkLoadClient secureClient = null;
689         boolean success = false;
690 
691         try {
692           LOG.debug("Going to connect to server " + getLocation() + " for row "
693               + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
694           byte[] regionName = getLocation().getRegionInfo().getRegionName();
695           if (!isSecureBulkLoadEndpointAvailable()) {
696             success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
697           } else {
698             HTable table = new HTable(conn.getConfiguration(), getTableName());
699             secureClient = new SecureBulkLoadClient(table);
700             success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
701               bulkToken, getLocation().getRegionInfo().getStartKey());
702           }
703           return success;
704         } finally {
705           //Best effort copying of files that might not have been imported
706           //from the staging directory back to original location
707           //in user directory
708           if(secureClient != null && !success) {
709             FileSystem targetFs = FileSystem.get(getConf());
710             // Check to see if the source and target filesystems are the same
711             // If they are the same filesystem, we will try move the files back
712             // because previously we moved them to the staging directory.
713             if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
714               for(Pair<byte[], String> el : famPaths) {
715                 Path hfileStagingPath = null;
716                 Path hfileOrigPath = new Path(el.getSecond());
717                 try {
718                   hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
719                     hfileOrigPath.getName());
720                   if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
721                     LOG.debug("Moved back file " + hfileOrigPath + " from " +
722                         hfileStagingPath);
723                   } else if(targetFs.exists(hfileStagingPath)){
724                     LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
725                         hfileStagingPath);
726                   }
727                 } catch(Exception ex) {
728                   LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
729                       hfileStagingPath, ex);
730                 }
731               }
732             }
733           }
734         }
735       }
736     };
737 
738     try {
739       List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
740       Configuration conf = getConf();
741       boolean success = RpcRetryingCallerFactory.instantiate(conf, null).<Boolean> newCaller()
742           .callWithRetries(svrCallable);
743       if (!success) {
744         LOG.warn("Attempt to bulk load region containing "
745             + Bytes.toStringBinary(first) + " into table "
746             + tableName  + " with files " + lqis
747             + " failed.  This is recoverable and they will be retried.");
748         toRetry.addAll(lqis); // return lqi's to retry
749       }
750       // success
751       return toRetry;
752     } catch (IOException e) {
753       LOG.error("Encountered unrecoverable error from region server, additional details: "
754           + svrCallable.getExceptionMessageAdditionalDetail(), e);
755       throw e;
756     }
757   }
758 
759   private boolean isSecureBulkLoadEndpointAvailable() {
760     String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
761     return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
762   }
763 
764   /**
765    * Split a storefile into a top and bottom half, maintaining
766    * the metadata, recreating bloom filters, etc.
767    */
768   static void splitStoreFile(
769       Configuration conf, Path inFile,
770       HColumnDescriptor familyDesc, byte[] splitKey,
771       Path bottomOut, Path topOut) throws IOException
772   {
773     // Open reader with no block cache, and not in-memory
774     Reference topReference = Reference.createTopReference(splitKey);
775     Reference bottomReference = Reference.createBottomReference(splitKey);
776 
777     copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
778     copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
779   }
780 
781   /**
782    * Copy half of an HFile into a new HFile.
783    */
784   private static void copyHFileHalf(
785       Configuration conf, Path inFile, Path outFile, Reference reference,
786       HColumnDescriptor familyDescriptor)
787   throws IOException {
788     FileSystem fs = inFile.getFileSystem(conf);
789     CacheConfig cacheConf = new CacheConfig(conf);
790     HalfStoreFileReader halfReader = null;
791     StoreFile.Writer halfWriter = null;
792     try {
793       halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
794       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
795 
796       int blocksize = familyDescriptor.getBlocksize();
797       Algorithm compression = familyDescriptor.getCompression();
798       BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
799       HFileContext hFileContext = new HFileContextBuilder()
800                                   .withCompression(compression)
801                                   .withChecksumType(HStore.getChecksumType(conf))
802                                   .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
803                                   .withBlockSize(blocksize)
804                                   .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
805                                   .build();
806       halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
807           fs)
808               .withFilePath(outFile)
809               .withBloomType(bloomFilterType)
810               .withFileContext(hFileContext)
811               .build();
812       HFileScanner scanner = halfReader.getScanner(false, false, false);
813       scanner.seekTo();
814       do {
815         KeyValue kv = scanner.getKeyValue();
816         halfWriter.append(kv);
817       } while (scanner.next());
818 
819       for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
820         if (shouldCopyHFileMetaKey(entry.getKey())) {
821           halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
822         }
823       }
824     } finally {
825       if (halfWriter != null) halfWriter.close();
826       if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose());
827     }
828   }
829 
830   private static boolean shouldCopyHFileMetaKey(byte[] key) {
831     return !HFile.isReservedFileInfoKey(key);
832   }
833 
834   private boolean doesTableExist(TableName tableName) throws Exception {
835     return hbAdmin.tableExists(tableName);
836   }
837 
838   /*
839    * Infers region boundaries for a new table.
840    * Parameter:
841    *   bdryMap is a map between keys to an integer belonging to {+1, -1}
842    *     If a key is a start key of a file, then it maps to +1
843    *     If a key is an end key of a file, then it maps to -1
844    * Algo:
845    * 1) Poll on the keys in order:
846    *    a) Keep adding the mapped values to these keys (runningSum)
847    *    b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list.
848    * 2) Return the boundary list.
849    */
850   public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
851     ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
852     int runningValue = 0;
853     byte[] currStartKey = null;
854     boolean firstBoundary = true;
855 
856     for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
857       if (runningValue == 0) currStartKey = item.getKey();
858       runningValue += item.getValue();
859       if (runningValue == 0) {
860         if (!firstBoundary) keysArray.add(currStartKey);
861         firstBoundary = false;
862       }
863     }
864 
865     return keysArray.toArray(new byte[0][0]);
866   }
867 
868   /*
869    * If the table is created for the first time, then "completebulkload" reads the files twice.
870    * More modifications necessary if we want to avoid doing it.
871    */
872   private void createTable(TableName tableName, String dirPath) throws Exception {
873     final Path hfofDir = new Path(dirPath);
874     final FileSystem fs = hfofDir.getFileSystem(getConf());
875 
876     // Add column families
877     // Build a set of keys
878     final HTableDescriptor htd = new HTableDescriptor(tableName);
879     final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
880     visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
881       @Override
882       public HColumnDescriptor bulkFamily(final byte[] familyName) {
883         HColumnDescriptor hcd = new HColumnDescriptor(familyName);
884         htd.addFamily(hcd);
885         return hcd;
886       }
887       @Override
888       public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
889           throws IOException {
890         Path hfile = hfileStatus.getPath();
891         HFile.Reader reader = HFile.createReader(fs, hfile,
892             new CacheConfig(getConf()), getConf());
893         try {
894           if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
895             hcd.setCompressionType(reader.getFileContext().getCompression());
896             LOG.info("Setting compression " + hcd.getCompressionType().name() +
897                      " for family " + hcd.toString());
898           }
899           reader.loadFileInfo();
900           byte[] first = reader.getFirstRowKey();
901           byte[] last  = reader.getLastRowKey();
902 
903           LOG.info("Trying to figure out region boundaries hfile=" + hfile +
904             " first=" + Bytes.toStringBinary(first) +
905             " last="  + Bytes.toStringBinary(last));
906 
907           // To eventually infer start key-end key boundaries
908           Integer value = map.containsKey(first)? map.get(first):0;
909           map.put(first, value+1);
910 
911           value = map.containsKey(last)? map.get(last):0;
912           map.put(last, value-1);
913         } finally {
914           reader.close();
915         }
916       }
917     });
918 
919     byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
920     this.hbAdmin.createTable(htd,keys);
921 
922     LOG.info("Table "+ tableName +" is available!!");
923   }
924 
925   @Override
926   public int run(String[] args) throws Exception {
927     if (args.length != 2) {
928       usage();
929       return -1;
930     }
931 
932     initialize();
933 
934     String dirPath = args[0];
935     TableName tableName = TableName.valueOf(args[1]);
936 
937     boolean tableExists = this.doesTableExist(tableName);
938     if (!tableExists) {
939       if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
940         this.createTable(tableName, dirPath);
941       } else {
942         String errorMsg = format("Table '%s' does not exist.", tableName);
943         LOG.error(errorMsg);
944         throw new TableNotFoundException(errorMsg);
945       }
946     }
947 
948     Path hfofDir = new Path(dirPath);
949     HTable table = new HTable(getConf(), tableName);
950 
951     doBulkLoad(hfofDir, table);
952     return 0;
953   }
954 
955   public static void main(String[] args) throws Exception {
956     Configuration conf = HBaseConfiguration.create();
957     int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);
958     System.exit(ret);
959   }
960 
961 }