HBase多表 bulkLoad

hbase 中快速导入大数据量可以使用bulkload 但是官方只有单表的load的相关案例. 经过查看一些源码MultiTableHFileOutputFormat可以支持多表bulkload
example

package com.cc.example; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.mapreduce.*; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; import java.util.TreeSet; import java.util.UUID; import java.util.function.Function; public class MultiTableHBaseBulkLoad {static class TableInfo { private TableDescriptor tableDesctiptor; private RegionLocator regionLocator; public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { this.tableDesctiptor = tableDesctiptor; this.regionLocator = regionLocator; }/** * The modification for the returned HTD doesn't affect the inner TD. * * @return A clone of inner table descriptor * @see #getTableDescriptor() * @see HBASE-18241 * @deprecated since 2.0.0 and will be removed in 3.0.0. Use {@link #getTableDescriptor()} * instead. */ @Deprecated public HTableDescriptor getHTableDescriptor() { return new HTableDescriptor(tableDesctiptor); }public TableDescriptor getTableDescriptor() { return tableDesctiptor; }public RegionLocator getRegionLocator() { return regionLocator; } }private static final Logger LOG = LoggerFactory.getLogger(MultiTableHBaseBulkLoad.class); public static final String LOCALITY_SENSITIVE_CONF_KEY = "hbase.bulkload.locality.sensitive.enabled"; private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = "hbase.mapreduce.use.multi.table.hfileoutputformat"; // The following constants are private since these are used by // HFileOutputFormat2 to internally transfer data between job setup and // reducer run using conf. // These should not be changed by the client. static final String COMPRESSION_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.compression"; static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; static final String BLOOM_PARAM_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomparam"; static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; protected static final byte[] tableSeparator = Bytes.toBytes("; "); static class HFileImportMapper2 extends Mapper {protected final String CF_KQ = "cf"; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineText = value.toString(); String[] splitData = https://www.it610.com/article/lineText.split(","); //表名 String tbName = splitData[0]; //rowkey String row = System.currentTimeMillis() + "_" + splitData[1]; //表名 ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(tbName)); //k/v KeyValue kv = new KeyValue(Bytes.toBytes(row), this.CF_KQ.getBytes(), splitData[2].getBytes(), splitData[3].getBytes()); context.write(rowkey, kv); } }static class CellSortReducer2 extends Reducer {@Override protected void reduce(ImmutableBytesWritable row, Iterable values, Context context) throws IOException, InterruptedException { TreeSet map = new TreeSet<>(CellComparator.getInstance()); for (KeyValue kv : values) { try { map.add(kv.clone()); } catch (CloneNotSupportedException e) { throw new IOException(e); } } context.setStatus("Read " + map.getClass()); int index = 0; for (KeyValue kv : map) { context.write(row, kv); if (++index % 100 == 0) { context.setStatus("Wrote " + index); } } } }public static void main(String[] args) { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.fs.tmp.dir", "partitions_" + UUID.randomUUID()); String input = "hdfs://nameservice2/tmp/person.txt"; String output = "hdfs://nameservice2/tmp/pres"; try { try { FileSystem fs = FileSystem.get(URI.create(output), conf); fs.delete(new Path(output), true); fs.close(); } catch (IOException e1) { e1.printStackTrace(); }Job job = Job.getInstance(conf); job.setJobName("HBase HFile"); job.setJarByClass(MultiTableHBaseBulkLoad.class); //输入的格式 job.setInputFormatClass(TextInputFormat.class); //mapper阶段 job.setMapperClass(HFileImportMapper2.class); job.setReducerClass(CellSortReducer.class); //输入/输出路径 FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, new Path(output)); TableName[] tableNames = new TableName[]{TableName.valueOf("default:t1"), TableName.valueOf("default:t2")}; try (Connection conn = ConnectionFactory.createConnection(conf)) { List tableInfoList = new ArrayList<>(); for (TableName tableName : tableNames) { Table table = conn.getTable(tableName); RegionLocator regionLocator = conn.getRegionLocator(tableName); tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator)); } configureIncrementalLoad(job, tableInfoList, MultiTableHFileOutputFormat.class); }boolean b = job.waitForCompletion(true); if (b) { for (TableName table : tableNames) { org.apache.hadoop.hbase.tool.LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); Path tableOutputPath = new Path(new Path(output), table.getNameWithNamespaceInclAsString().replace(":", "/")); org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(conf); Table htable = hbaseConn.getTable(TableName.valueOf(table.getNameWithNamespaceInclAsString())); LOG.info("Loading HFiles for {} from {}", table.getNameWithNamespaceInclAsString(), tableOutputPath); loader.doBulkLoad(tableOutputPath, hbaseConn.getAdmin(), htable, hbaseConn.getRegionLocator(TableName.valueOf(table.getNameWithNamespaceInclAsString()))); LOG.info("Incremental load complete for table=" + table.getNameWithNamespaceInclAsString()); } } } catch (Exception e) { e.printStackTrace(); } }static void configureIncrementalLoad(Job job, List multiTableInfo, Class> cls) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(cls); if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); } boolean writeMultipleTables = false; if (MultiTableHFileOutputFormat.class.equals(cls)) { writeMultipleTables = true; conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); } // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. if (KeyValue.class.equals(job.getMapOutputValueClass()) || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(CellSortReducer2.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else if (Text.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(TextSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); }conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), CellSerialization.class.getName()); if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { LOG.info("bulkload locality sensitive enabled"); }/* Now get the region start keys for every table required */ List allTableNames = new ArrayList<>(multiTableInfo.size()); List regionLocators = new ArrayList<>(multiTableInfo.size()); List tableDescriptors = new ArrayList<>(multiTableInfo.size()); for (TableInfo tableInfo : multiTableInfo) { regionLocators.add(tableInfo.getRegionLocator()); String tn = writeMultipleTables ? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString() : tableInfo.getRegionLocator().getName().getNameAsString(); allTableNames.add(tn); tableDescriptors.add(tableInfo.getTableDescriptor()); } // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes .toString(tableSeparator))); List startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); // Use table's region boundaries for TOP split points. LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count for all tables"); job.setNumReduceTasks(startKeys.size()); configurePartitioner(job, startKeys, writeMultipleTables); // Set compression algorithms based on column familiesconf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, tableDescriptors)); conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, tableDescriptors)); conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors)); conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails, tableDescriptors)); conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); }/** * Configure job with a TotalOrderPartitioner, partitioning against * splitPoints. Cleans up the partitions file after job exists. */ static void configurePartitioner(Job job, List splitPoints, boolean writeMultipleTables) throws IOException { Configuration conf = job.getConfiguration(); // create the partitions file FileSystem fs = FileSystem.get(conf); String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging"); Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); fs.deleteOnExit(partitionsPath); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); }/** * Write out a {@link SequenceFile} that can be read by * {@link TotalOrderPartitioner} that contains the split points in startKeys. */ @SuppressWarnings("deprecation") private static void writePartitions(Configuration conf, Path partitionsPath, List startKeys, boolean writeMultipleTables) throws IOException { LOG.info("Writing partition information to " + partitionsPath); if (startKeys.isEmpty()) { throw new IllegalArgumentException("No regions passed"); }// We're generating a list of split points, and we don't ever // have keys < the first region (which has an empty start key) // so we need to remove it. Otherwise we would end up with an // empty reducer with index 0 TreeSet sorted = new TreeSet<>(startKeys); ImmutableBytesWritable first = sorted.first(); if (writeMultipleTables) { first = new ImmutableBytesWritable(getSuffix(sorted.first ().get())); } if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { throw new IllegalArgumentException( "First region of table should have empty start key. Instead has: " + Bytes.toStringBinary(first.get())); } sorted.remove(sorted.first()); // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); SequenceFile.Writer writer = SequenceFile.createWriter( fs, conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class); try { for (ImmutableBytesWritable startKey : sorted) { writer.append(startKey, NullWritable.get()); } } finally { writer.close(); } }@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "https://www.it610.com/article/RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") @VisibleForTesting static String serializeColumnFamilyAttribute(Function fn, List allTables) throws UnsupportedEncodingException { StringBuilder attributeValue = https://www.it610.com/article/new StringBuilder(); int i = 0; for (TableDescriptor tableDescriptor : allTables) { if (tableDescriptor == null) { // could happen with mock table instance // CODEREVIEW: Can I set an empty string in conf if mock table instance? return""; } for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { if (i++ > 0) { attributeValue.append('&'); } attributeValue.append(URLEncoder.encode( Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())), "UTF-8")); attributeValue.append('='); attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); } } // Get rid of the last ampersand return attributeValue.toString(); }/** * Return the start keys of all of the regions in this table, * as a list of ImmutableBytesWritable. */ private static List getRegionStartKeys(List regionLocators, boolean writeMultipleTables) throws IOException {ArrayList ret = new ArrayList<>(); for (RegionLocator regionLocator : regionLocators) { TableName tableName = regionLocator.getName(); LOG.info("Looking up current regions for table " + tableName); byte[][] byteKeys = regionLocator.getStartKeys(); for (byte[] byteKey : byteKeys) { byte[] fullKey = byteKey; //HFileOutputFormat2 use case if (writeMultipleTables) { //MultiTableHFileOutputFormat use case fullKey = combineTableNameSuffix(tableName.getName(), byteKey); } if (LOG.isDebugEnabled()) { LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary (fullKey) + "]"); } ret.add(new ImmutableBytesWritable(fullKey)); } } return ret; }@VisibleForTesting static Function compressionDetails = familyDescriptor -> familyDescriptor.getCompressionType().getName(); /** * Serialize column family to block size map to configuration. Invoked while * configuring the MR job for incremental load. */ @VisibleForTesting static Function blockSizeDetails = familyDescriptor -> String .valueOf(familyDescriptor.getBlocksize()); /** * Serialize column family to bloom type map to configuration. Invoked while * configuring the MR job for incremental load. */ @VisibleForTesting static Function bloomTypeDetails = familyDescriptor -> { String bloomType = familyDescriptor.getBloomFilterType().toString(); if (bloomType == null) { bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); } return bloomType; }; /** * Serialize column family to bloom param map to configuration. Invoked while * configuring the MR job for incremental load. * * @param tableDescriptor * to read the properties from * @param conf * to persist serialized values into * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting static Function bloomParamDetails = familyDescriptor -> { BloomType bloomType = familyDescriptor.getBloomFilterType(); String bloomParam = ""; if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); } else if (bloomType == BloomType.ROWPREFIX_DELIMITED) { bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.DELIMITER_KEY); } return bloomParam; }; protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { return Bytes.add(tableName, tableSeparator, suffix); }/** * Serialize column family to data block encoding map to configuration. * Invoked while configuring the MR job for incremental load. */ @VisibleForTesting static Function dataBlockEncodingDetails = familyDescriptor -> { DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); if (encoding == null) { encoding = DataBlockEncoding.NONE; } return encoding.toString(); }; final private static int validateCompositeKey(byte[] keyBytes) {int separatorIdx = Bytes.indexOf(keyBytes, tableSeparator); // Either the separator was not found or a tablename wasn't present or a key wasn't present if (separatorIdx == -1) { throw new IllegalArgumentException("Invalid format for composite key [" + Bytes .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key"); } return separatorIdx; }protected static byte[] getSuffix(byte[] keyBytes) { int separatorIdx = validateCompositeKey(keyBytes); return Bytes.copy(keyBytes, separatorIdx + 1, keyBytes.length - separatorIdx - 1); } }

因为是cdh6.3.2 的环境MapReduce需要加载变量
# CDH 中MR执行需要一些环境变量等. 1. export HBASE_HOME=/opt/cloudera/parcels/CDH/lib/hbase 2. export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:classpath:/opt/phoenix-bulkload/hbase-conf 3. hadoop jar hbase-multitable-1.0.0.jar com.nvr.example.HBaseBulkLoad

person.txt
t1; ,row1,smartloli1,100 t1; ,row2,smartloli2,101 t2; ,row3,smartloli1,103 t2; ,row4,smartloli2,102

建表
create 't1','cf' create 't2','cf'

    推荐阅读