1、使用sqoop直接导入
(1)创建Hbase表
-- 1、如果用户表存在先删除
hbase(main):013:0> disable 'tbl_users' hbase(main):014:0> drop 'tbl_users'
-- 或者清空表
hbase(main):015:0> truncate 'tbl_users'
-- 2、创建用户表
hbase(main):016:0> create 'tbl_users','detail'
hbase(main):019:0> desc "tbl_users" Table tbl_users is ENABLED tbl_users COLUMN FAMILIES DESCRIPTION {NAME => 'detail', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
(2)sqoop全量导入Hbase中
可以使用SQOOP将MySQL表的数据导入到HBase表中,指定 表的名称、列簇及RowKey
/export/servers/sqoop/bin/sqoop import \
-D sqoop.hbase.add.row.key=true \
--connect jdbc:mysql://bigdata-cdh01:3306/tags_dat \
--username root \
--password 123456 \
--table tbl_users \
--hbase-create-table \
--hbase-table tbl_users \
--column-family detail \
--hbase-row-key id \
--num-mappers 2
参数含义解释:
1、-D sqoop.hbase.add.row.key=true 是否将rowkey相关字段写入列族中,默认为false,默认情况下你将在列族中看不到任何row key中的字段。注意,该参数必须放在import之后。
2、--hbase-create-table 如果hbase中该表不存在则创建 3、--hbase-table 对应的hbase表名 4、--hbase-row-key hbase表中的rowkey,注意格式 5、--column-family hbase表的列族
(3)sqoop增量导入Hbase中
/export/servers/sqoop/bin/sqoop import \
-D sqoop.hbase.add.row.key=true \
--connect jdbc:mysql://bigdata-cdh01.itcast.cn:3306/tags_dat \
--username root \
--password 123456 \
--table tbl_logs \
--hbase-create-table \
--hbase-table tag_logs \
--column-family detail \
--hbase-row-key id \
--num-mappers 20 \
--incremental lastmodified \
--check-column log_time \
--last-value '2019-08-13 00:00:00' \
相关增量导入参数说明:
1、--incremental lastmodified 增量导入支持两种模式 append 递增的列;lastmodified 时间戳。 2、--check-column 增量导入时参考的列 3、--last-value 最小值,这个例子中表示导入2019-08-13 00:00:00到今天的值
注意:
使用SQOOP导入数据到HBase表中,有一个限制:
需要指定RDBMs表中的某个字段作为HBase表的ROWKEY,如果HBase表的ROWKEY为多
个字段组合,就无法指定,所以此种方式有时候不能使用。
2、Hbase自带工具—HBase ImportTSV 【spark|MySQL数据库数据迁移到Hbase的几种方法】HBase ImportTSV将tsv(也可以是csv,每行数据中各个字段使用分隔符分割)格式文本数据,加载到HBase表中。
(1) 采用Put方式加载导入
采用Put方式向HBase表中插入数据流程:
Put
-> WAL 预写日志
-> MemStore(内存) ,当达到一定大写Spill到磁盘上:
1) 先导入数据至Hive表 使用Sqoop将MySQL数据库表中的数据导入到Hive表中(本质就是存储在HDFS上)
/export/servers/sqoop/bin/sqoop import \
--connect jdbc:mysql://bigdata-cdh01:3306/tags_dat \
--username root \
--password 123456 \
--table tbl_users \
--direct \
--hive-overwrite \
--delete-target-dir \
--fields-terminated-by '\t' \
--lines-terminated-by '\n' \
--hive-table tags_dat.tbl_users \
--hive-import \
--num-mappers 1
2) 从Hive表到Hbase
HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar \ importtsv \
-
Dimporttsv.columns=HBASE_ROW_KEY,detail:log_id,detail:remote_ip,detail:site_global_ticket,detail:site_global_session,detail:global_user_id,detail:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail:log _time \
tbl_logs \
/user/hive/warehouse/tags_dat.db/tbl_logs
(2) 将数据直接保存为HFile文件,然后加载到HBase表中
# 1. 生成HFILES文件
HADOOP_HOME=/export/servers/hadoop
HBASE_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar \
importtsv \
-Dimporttsv.bulk.output=hdfs://bigdata-cdh01:8020/datas/output_hfile/tbl_logs \
-
Dimporttsv.columns=HBASE_ROW_KEY,detail:log_id,detail:remote_ip,detail:site_global_ticket,detail:site_global_session,detail:global_user_id,detail:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail: log_time \
tbl_logs \
/user/hive/warehouse/tags_dat.db/tbl_logs # 2. 将HFILE文件加载到表中
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${HBASE_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar \
completebulkload \
hdfs://bigdata-cdh01:8020/datas/output_hfile/tbl_logs \
tbl_logs
注意:
1)、ROWKEY不能是组合主键 只能是某一个字段
2)、当表中列很多时,书写-Dimporttsv.columns值时很麻烦,容易出错
3、HBase Bulkload 在大量数据需要写入HBase时,通常有 put方式和bulkLoad 两种方式。
1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息 写入WAL ,
在写入到WAL后, 数据就会被放到MemStore中 ,当MemStore满后数据就会被 flush到磁盘
(即形成HFile文件) ,在这种写操作过程会涉及到flush、split、compaction等操作,容易造
成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统
性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。
文章图片
2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量
生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。
文章图片
(1)编写MR程序
package com.yyds.tags.mr.etl;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
/**
* 定义常量
*/
interface Constants {
// hive表数据目录
String INPUT_PATH = "hdfs://bigdata-cdh01:8020/user/hive/warehouse/tags_dat.db/tbl_logs";
// 生成的hfile目录
String HFILE_PATH = "hdfs://bigdata-cdh01:8020/datas/output_hfile/tbl_logs";
// 表名
String TABLE_NAME = "tbl_logs";
// 列簇名称
byte[] COLUMN_FAMILY = Bytes.toBytes("detail");
// 表字段
List list = new ArrayList() {
private static final long serialVersionUID = -6125158551837044300L;
{
add(Bytes.toBytes("id"));
add(Bytes.toBytes("log_id"));
add(Bytes.toBytes("remote_ip"));
add(Bytes.toBytes("site_global_ticket"));
add(Bytes.toBytes("site_global_session"));
add(Bytes.toBytes("global_user_id"));
add(Bytes.toBytes("cookie_text"));
add(Bytes.toBytes("user_agent"));
add(Bytes.toBytes("ref_url"));
add(Bytes.toBytes("loc_url"));
add(Bytes.toBytes("log_time"));
}
};
}
package com.yyds.tags.mr.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* 将Hive表数据转换为HFile文件并移动HFile到HBase
*/
public class LoadLogsToHBaseMapReduce extends Configured implements Tool {// 连接HBase Connection对象
private static Connection connection = null;
/**
* 定义Mapper类,读取CSV格式数据,转换为Put对象,存储HBase表
*/
static class LoadLogsToHBase extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 按照分隔符分割数据,分隔符为 逗号
String[] split = value.toString().split("\\t");
if (split.length == Constants.list.size()) {
// 构建Put对象,将每行数据转换为Put
Put put = new Put(Bytes.toBytes(split[0]));
for (int i = 1;
i < Constants.list.size();
i++) {
put.addColumn(
Constants.COLUMN_FAMILY,
Constants.list.get(i),
Bytes.toBytes(split[i])
);
}
// 将数据输出
context.write(new ImmutableBytesWritable(put.getRow()), put);
}
}
}@Override
public int run(String[] strings) throws Exception {
// a. 获取配置信息对象
Configuration configuration = super.getConf() ;
// b. 构建Job对象Job
Job job = Job.getInstance(configuration);
job.setJobName(this.getClass().getSimpleName());
job.setJarByClass(LoadLogsToHBaseMapReduce.class);
// c. 设置Job
FileInputFormat.addInputPath(job, new Path(Constants.INPUT_PATH));
job.setMapperClass(LoadLogsToHBase.class);
// 设置输出格式为HFileOutputFormat2
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
// 判断输出目录是否存在,如果存在就删除
FileSystem hdfs = FileSystem.get(configuration) ;
Path outputPath = new Path(Constants.HFILE_PATH) ;
if(hdfs.exists(outputPath)){
hdfs.delete(outputPath, true) ;
}
// d. 设置输出路径
FileOutputFormat.setOutputPath(job, outputPath);
// 获取HBase Table,对HFileOutputFormat2进行设置
Table table = connection.getTable(TableName.valueOf(Constants.TABLE_NAME));
HFileOutputFormat2.configureIncrementalLoad(
job,
table,
connection.getRegionLocator(TableName.valueOf(Constants.TABLE_NAME)));
// 提交运行Job,返回是否执行成功
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}public static void main(String[] args) throws Exception {
// 获取Configuration对象,读取配置信息
Configuration configuration = HBaseConfiguration.create();
// 获取HBase 连接Connection对象
connection = ConnectionFactory.createConnection(configuration);
// 运行MapReduce将数据文件转换为HFile文件
int status = ToolRunner.run(configuration, new LoadLogsToHBaseMapReduce(), args);
System.out.println("HFile文件生成完毕!~~~");
// 运行成功时,加载HFile文件数据到HBase表中
if (0 == status) {
// 获取HBase Table句柄
Admin admin = connection.getAdmin();
Table table = connection.getTable(TableName.valueOf(Constants.TABLE_NAME));
// 加载数据到表中
LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
load.doBulkLoad(
new Path(Constants.HFILE_PATH),
admin,
table,
connection.getRegionLocator(TableName.valueOf(Constants.TABLE_NAME))
);
System.out.println("HFile文件移动完毕!~~~");
}
}
}
(2)编写spark程序
package com.yyds.tags.mr.etl.hfileimport scala.collection.immutable.TreeMap/**
* HBase 中各个表的字段名称,存储在TreeMap中
*/
object TableFieldNames{//使用TreeMap为qualifier做字典序排序// 行为日志数据表的字段
val LOG_FIELD_NAMES: TreeMap[String, Int] = TreeMap(
("id", 0),
("log_id", 1),
("remote_ip", 2),
("site_global_ticket", 3),
("site_global_session", 4),
("global_user_id", 5),
("cookie_text", 6),
("user_agent", 7),
("ref_url", 8),
("loc_url", 9),
("log_time", 10)
)
}
package com.yyds.tags.mr.etl.hfileimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable.TreeMap/**
* @DESC: 将数据存储文本文件转换为HFile文件,加载到HBase表中
*
*HBase数据库提供批量导入数据至表功能,相关知识点如下:
*1、Hbase 中LoadIncrementalHFiles 支持向Hbase 写入HFile 文件
*2、写入的HFile 文件要求是排序的(rowKey,列簇,列)
*3、关键是绕过Hbase regionServer,直接写入Hbase文件
*4、Spark RDD的 repartitionAndSortWithinPartitions 方法可以高效地实现分区并排序
*5、JAVA util.TreeMap 是红黑树的实现,能很好的实现排序的要求
*/
object HBaseBulkLoader {/**
* 依据不同表的数据文件,提取对应数据,封装到KeyValue对象中
* 提取数据字段,构建二元组(RowKey, KeyValue)
* Key: rowkey + cf + column + version(timestamp)
* Value: ColumnValue
*
* @param line
* @param family
* @param fieldNames
* @return
*/def getLineToData(line: String, family: String, fieldNames: TreeMap[String, Int]): List[(ImmutableBytesWritable, KeyValue)] = {
val length = fieldNames.size
// 分割字符串
val fieldValues: Array[String] = line.split("\\t", -1)
if (null == fieldValues || fieldValues.length != length) return Nil
// 获取id,构建RowKey
val id: String = fieldValues(0)
val rowKey = Bytes.toBytes(id)
val ibw: ImmutableBytesWritable = new ImmutableBytesWritable(rowKey)
// 列簇
val columnFamily: Array[Byte] = Bytes.toBytes(family)
// 构建KeyValue对象
fieldNames.toList.map { case (fieldName, fieldIndex) =>
// KeyValue实例对象
val keyValue = https://www.it610.com/article/new KeyValue(
rowKey, //
columnFamily, //
Bytes.toBytes(fieldName), //
Bytes.toBytes(fieldValues(fieldIndex)) //
)
// 返回
(ibw, keyValue)
}
}def main(args: Array[String]): Unit = {// 应用执行时传递5个参数:数据类型、HBase表名称、表列簇、输入路径及输出路径
/**
* args = Array ("1", "tbl_tag_logs", "detail", "/user/hive/warehouse/tags_dat.db/tbl_logs", "/datas/output_hfile/tbl_tag_logs")
*/
if (args.length != 5) {
println("Usage: required params: ")
System.exit(-1)
}
// 将传递赋值给变量, 其中数据类型:1Log、2Good、3User、4Order
val Array(dataType, tableName, family, inputDir, outputDir) = args// 依据参数获取处理数据schema
val fieldNames = dataType.toInt match {
case 1 => TableFieldNames.LOG_FIELD_NAMES
case 2 => TableFieldNames.GOODS_FIELD_NAMES
case 3 => TableFieldNames.USER_FIELD_NAMES
case 4 => TableFieldNames.ORDER_FIELD_NAMES
}// 1. 构建SparkContext实例对象
val sc: SparkContext = {
// a. 创建SparkConf,设置应用配置信息
val sparkConf = new SparkConf()
//.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// b. 传递SparkContext创建对象
SparkContext.getOrCreate(sparkConf)
}// 2. 读取文本文件数据,转换格式
val keyValuesRDD: RDD[(ImmutableBytesWritable, KeyValue)] = sc.textFile(inputDir)
.filter(line => null != line) // 过滤数据
.flatMap { line => getLineToData(line, family, fieldNames) }
.sortByKey()// TODO:构建Job,设置相关配置信息,主要为输出格式
// a. 读取配置信息
val conf: Configuration = HBaseConfiguration.create()
//Configuration parameter hbase.mapreduce.hfileoutputformat.table.name cannot be empty
conf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName)
// b. 如果输出目录存在,删除
val dfs = FileSystem.get(conf)
val outputPath: Path = new Path(outputDir)
if (dfs.exists(outputPath)) {
dfs.delete(outputPath, true)
}
dfs.close()// TODO:c. 配置HFileOutputFormat2输出
val conn = ConnectionFactory.createConnection(conf)
val htableName = TableName.valueOf(tableName)
val table: Table = conn.getTable(htableName)
HFileOutputFormat2.configureIncrementalLoad(
Job.getInstance(conf), //
table, //
conn.getRegionLocator(htableName) //
)
// TODO: 3. 保存数据为HFile文件
keyValuesRDD
.sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整体有序
.saveAsNewAPIHadoopFile(
outputDir, //
classOf[ImmutableBytesWritable], //
classOf[KeyValue], //
classOf[HFileOutputFormat2], //
conf //
)
// TODO:4. 将输出HFile加载到HBase表中
val load = new LoadIncrementalHFiles(conf)
load.doBulkLoad(outputPath, conn.getAdmin, table,
conn.getRegionLocator(htableName))// 应用结束,关闭资源
sc.stop()
}}
推荐阅读
- Xshell用root用户连接Linux
- 校招|【校招 --阶段一 系统编程】线程控制
- Linux内核|LinuxKernel内核百炼成神之渡劫中断【贰】
- LINUX安装宝塔环境
- Linux系统安全及应用
- Linux|Nginx反向代理部署多个项目
- 快速上手Linux核心命令|快速上手Linux核心命令(二)(基础命令)
- 快速上手Linux核心命令|快速上手Linux核心命令(一)(核心命令简介)
- Spark|Spark 进程模型与分布式部署(什么是分布式计算())