文章目录
- Flume和Sqoop
-
- 一、Flume的功能与应用
-
- 1.功能
- 2.应用
- 二、Flume的基本组成
- 三、Flume的开发规则:
- 四、Flume开发测试
- 五、常用Source
-
- 1.Exec
- 2.Taildir
- 3.其他source
- 六、常见Channel
- 七、常见Sink
-
- 1.常用的Sink
- 2.Flume架构和高级组件
- 八、Sqoop的功能和作用
- 九、Sqoop导入:HDFS
- 十、Sqoop导入:Hive
- 十一、Sqoop导入:增量导入
- 十二、Sqoop导出:全量导出
- 十三、Sqoop导出:增量导出
- 十四、Sqoop Job
- 十五、Sqoop密码问题与脚本封装
Flume和Sqoop 一、Flume的功能与应用 1.功能
数据采集:将数据从一个地方采集到另一个地方
将数据进行了复制
大数据的数据采集:将各种需要处理的数据源复制到大数据仓库中
实现:实时数据流的数据采集,可以将不同各种数据源的数据采集到各种目标地
数据源:文件、网络端口
Flume:实时
目标地:HDFS、Hbase、Hive、Kafka
特点:
功能全面
所有的读取和写入的程序,都已经封装好了
只需要配置从哪读,写入哪里,就要可以实现采集
允许自定义开发
如果功能不能满足实际的业务需求,Flume提供各种接口,允许自定义开发
基于Java开发的应用程序
开发相对简单
所有功能都封装好了,只要调用即可
写一个配置文件:从哪读,都谁,写到哪去
可以实现分布式采集
分布式采集:每一台机器都可以用Flume进行采集
注意:Flume不是分布式架构
2.应用
应用于实时数据流采集场景
基于文件或者网络协议端口的数据流采集
美团的Flume设计架构
二、Flume的基本组成
文章图片
Agent:每个Agent就是一个Flume的程序,每个Agent由三部分组成:source、Channel、Sink
Source:负责读取数据,Source会动态监听数据源,将数据源新增的数据实时采集变成Event数据流,将每个Event发送到Channel中
- 每一条数据会变成一个Event
- 实时监听数据源
内存、文件(磁盘)
Sink:负责发送数据,从Channel中读取采集到的数据,将数据写入目标地
sink主动到Channel中读取数据
Event:用于构建每一条数据的对象,每一条数据就会变成一个Event,进行传递,最终写入目标
组成
- head:定义一些KV属性和配置,默认head时空的
- body:数据存在body中
理解:
Event
Map head;
byte[] body;
//每一条数据的字节流
三、Flume的开发规则: step1:开发一个Flume的参数配置文件
properties格式的文件:
#step1:定义一个agent:agent的名称、定义source、channel、sink
#step2:定义source:读什么、读哪
#step3:定义channel:缓存在什么地方
#step4:定义sink:写入什么地方
step2:运行flume的agent程序
flume -ng
Usage:bin/flume-ng [options]...
为什么叫Flume-ng?
- flume-og:老的版本,架构非常麻烦,性能非常差,后不用了
- flume-ng:现在用的版本
flume-ng agent -c -f -n
agent:表示要运行一个Flume程序
- -c :指定Flume的配置文件目录
- -f :要运行那个文件
- -n :运行的agent的名字是什么
四、Flume开发测试 需求:采集Hive的日志、临时缓存在内存中、将日志写入Flume的日志中并打印在命令中
source:采集一个文件数据
创建测试目录:
cd /export/server/flume-1.6.0-cdh5.14.0-bin
mkdir usercase
复制官方示例:
cp conf/flume-conf.properties.template usercase/hive-mem-log.properties
hive-mem-log.properties:采集hive的日志临时缓存在内存中最终打印在日志中
文章图片
文章图片
文章图片
Exec Source
- 执行一条Linux的命令来实现采集
- 命令:搭配tail -f动态采集文件最新的内容
文章图片
文章图片
文章图片
- memory channel将数据缓存在内存中
文章图片
文章图片
Sink:Flume提供很多sink - Logger Sink 日志类型的Sink
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'#define the agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1#define the source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000#define the sink
a1.sinks.k1.type = logger#bond
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
运行
#1.切换到指定目录
cd /export/server/flume-1.6.0-cdh5.14.0-bin/
#2.运行agent程序
flume-ng agent -c conf/ -f usercase/hive-mem-log.properties -n a1 -Dflume.root.logger=INFO,console
- -Dflume.root.logger=INFO,console:将flume的日志打印在命令行
文章图片
五、常用Source 1.Exec
功能:通过执行一条Linux命令来实现数据量动态采集
- 固定搭配tail -F使用
2.Taildir
功能:从Apache Flume1.7版本开始支持,动态监听采集多个文件
- 如果用的是1.5或者1.6,遇到这个问题,需要自己手动编译这个功能
需求:让Flume动态监听一个文件和一个目录下的所有文件
准备
cd /export/server/flume-1.6.0-cdh5.14.0-bin
mkdir position
mkdir -p /export/data/flume
echo " " >> /export/data/flume/bigdata01.txt
mkdir-p /export/data/flume/bigdata
开发
cp usercase/hive-mem-log.properties usercase/taildir-mem-log.properties
taildir-mem-log.properties
# define sourceName/channelName/sinkName for the agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1# define the s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.6.0-cdh5.14.0-bin/position/taildir_position.json
#将所有需要监控的数据源变成一个组,这个组内有两个数据源
a1.sources.s1.filegroups = f1 f2
#指定了f1是谁:监控一个文件
a1.sources.s1.filegroups.f1 = /export/data/flume/bigdata01.txt
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.headerKey1 = value1
#指定f2是谁:监控一个目录下的所有文件
a1.sources.s1.filegroups.f2 = /export/data/flume/bigdata/.*
#指定f2采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f2.headerKey1 = value2
a1.sources.s1.fileHeader = true # define the c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# def the k1
a1.sinks.k1.type = logger#source、channel、sink bond
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
运行
flume-ng agent -c conf/ -f usercase/taildir-mem-log.properties -n a1 -Dflume.root.logger=INFO,console
文章图片
元数据文件的功能:/export/server/flume-1.6.0-cdh5.14.0-bin/position/taildir_position.json
问题:如果Flume程序故障,重启Flume程序,已经被采集过的数据还要不要采集?
需求:不需要,不能导致数据重复
功能:记录Flume所监听的每个文件已经被采集的位置
[
{"inode":34599996,"pos":14,"file":"/export/data/flume/bigdata01.txt"},{"inode":67595704,"pos":19,"file":"/export/data/flume/bigdata/test01.txt"},{"inode":67805657,"pos":7,"file":"/export/data/flume/bigdata/test02.txt"}
]
3.其他source
Kafka Source:监听读取Kafka数据
Spooldir Source:监控一个目录,只要这个目录中产生一个文件,就会采集一个文件
缺点:不能动态监控文件,被采集的文件是不能发生变化的
六、常见Channel mem Channel:将数据缓存在内存中
- 特点:读写快、容量小、安全性较差
- 应用:小数据量的高性能的传输
- 特点:读写相对慢、容量大、安全性较高
- 应用:数据量大,读写性能要求不高的场景下
capacity:缓存大小:指定Channel中最多存储多少条event
transactionCapacity:每次传输的大小
- 每次source最多放多少个event和每次sink最多取多少个event
- 这个值一般为capacity的十分之一,不能超过capacity
- Kafka Sink
- HDFS Sink
- 原因1:很多场景下,需要对数据提前做一步ETL,将ETL以后的结果再入库
- 原因2:Hive Sink有严格的要求,表必须为桶表,文件类型必须为orc
- 用HDFS sink代替Hive sink
问题:Flume作为HDFS客户端,写入HDFS数据
- Flume必须知道HDFS地址
- Flume必须拥有HDFS的jar包
- 方式一:Flume写地址的时候,指定HDFS的绝对地址
hdfs://node1:8020/nginx/log
手动将需要的jar包放入Flume的lib目录下
- 方式二:在Flume中配置Hadoop的环境变量,将core-site和hdfs-site放入Flume的配置文件目录
cp hive-mem-log.properties hive-mem-hdfs.properties
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log #定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:8020/flume/test1#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1
启动:
flume-ng agent -c conf/ -f usercase/hive-mem-hdfs.properties -n a1 -Dflume.root.logger=INFO,console
文章图片
文章图片
指定文件大小
- 问题:Flume默认写入HDFS上会产生很多小文件,都在1KB左右,不利用HDFS存储
- 解决:指定文件大小
hdfs.rollInterval 30每隔多长时间产生一个文件,单位为s
hdfs.rollSize1024每个文件多大产生一个文件,字节
hdfs.rollCount10多少个event生成一个文件
如果不想使用某种规则,需要关闭,设置为0
cp hive-mem-hdfs.properties hive-mem-size.properties
hive-mem-size.properties
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log #定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:8020/flume/test2
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 10240
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1
指定分区
cp hive-mem-hdfs.properties hive-mem-part.properties
运行:
flume-ng agent -c conf/ -f usercase/hive-mem-part.properties -n a1 -Dflume.root.logger=INFO,console
文章图片
其他参数
#指定生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = nginx
#指定生成的文件的后缀
a1.sinks.k1.hdfs.fileSuffix = .log
#指定写入HDFS的文件的类型:普通的文件
a1.sinks.k1.hdfs.fileType = DataStream
2.Flume架构和高级组件
Flume架构
- 1.多Sink
- 一个agent可以有多个source、channel、sink
- 多个sink架构中,为了每个sink都有一份完整数据,每个sink必须对应一个独立的channel
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
文章图片
- 2.Collect架构
- 两层Flume架构:如果大量并发直接写入HDFS,导致HDFS的IO负载比较高
- 第一层
- source:taildir source
- sink:avro sink
- 第二层
- source:avro source
- sink:HDFS sink
文章图片
高级组件
Flume Channel Selectors
- 功能: 用于决定source怎么将数据给channel
- 规则:默认:source默认将数据给每个channel一份
- Replicating Channel Selector (default)
- 选择:根据event头部的key值不同,给不同的channel
- Multiplexing Channel Selector
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
Flume Interceptors:拦截器
- 功能:可以给event的头部添加KV,还可以对数据进行过滤
- 提供
- 1.Timestamp Interceptor:自动在每个event头部添加一个KV
- key:timestamp
- value:event产生的时间
- 1.Timestamp Interceptor:自动在每个event头部添加一个KV
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
- 2.Host Interceptor:自动在每个event头部添加一个KV
- key:host
- value:这个event所在的机器的名称
- 3.Static Interceptor:自动在每个event头部添加一个KV
- KV由用户自己指定
- 4.Regex Filtering Interceptor:正则过滤拦截器,判断数据是否符合正则表达式,不符合就直接过滤,不采集
-功能:实现collect架构中的高可用和负载均衡
- 高可用failover:两个sink,一个工作,一个不工作
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
- priority:权重越大,就先工作
- 负载均衡load_balance:两个sink,一起工作
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random
- 分配策略:round_robin,random
文章图片
- 第一层必须有两个sink,作为一个整体,称为sink group
- 用于实现MySQL等RDBMS数据库与HDFS之间的数据导入与导出
导入与导出:相对HDFS而言 - 导入:将MySQL的数据导入到HDFS
- 导出:将HDFS的数据导出到MySQL
- 底层就是MapReduce程序(大多数都是三大阶段的MapReduce)
- 将Sqoop的程序转换成了MapReduce程序,提交给YARN运行,实现分布式采集
- 导入:MySQL -->> HDFS
- Input:DBInputFormat:读MySQL
- Output:TextOutputFormat:写HDFS
- 导出:HDFS -->> MySQL
- Input:TextInputFormat:读HDFS
- Output:DBOutputFormat:写MySQL
特点
- 必须依赖于Hadoop:MapReduce + YARN
- MapReduce是离线计算框架,Sqoop离线数据采集的工具,只能适合于离线业务平台
- 数据同步:定期将离线的数据进行采集同步到数据仓库中
- 全量:每次都采集所有数据
- 增量:每次只采集最新的数据,大部分都是增量处理
- 数据迁移:将历史数据【MySQL、Oracle】存储到HDFS中
- 全量:第一次一定是全量的
sqoop list-databases --connect jdbc:mysql://node3:3306 --username root --password 123456
文章图片
九、Sqoop导入:HDFS 准备数据:
- MySQL创建数据库
create database sqoopTest;
use sqoopTest;
- MySQL创建数据表
CREATE TABLE `tb_tohdfs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(100) NOT NULL,
`age` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- MySQL插入数据
insert into tb_tohdfs values(null,"laoda",18);
insert into tb_tohdfs values(null,"laoer",19);
insert into tb_tohdfs values(null,"laosan",20);
insert into tb_tohdfs values(null,"laosi",21);
导入语法:
#查看sqoop import帮助
sqoop import --help
- 指定数据源:MySQL
- url
- username
- password
- table
- 指定目标地:HDFS
- 指定写入的位置
- 需求1:将MySQL中tb_tohdfs表的数据导入HDFS的/sqoop/import/test01目录中
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--target-dir /sqoop/import/test01
文章图片
- 需求2:将tb_tohdfs表的id和name导入HDFS的/sqoop/import/test01目录,并且用制表符分隔
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--columns id,name \
--delete-target-dir\
--target-dir /sqoop/import/test01 \
--fields-terminated-by '\t' \
-m 1
- -m:指定MapTask的个数
- –fields-terminated-by:用于指定输出的分隔符
- –columns:指定导入哪些列
- –delete-target-dir :提前删除输出目录
文章图片
- 需求3:将tb_tohdfs表中的id >2的数据导入HDFS的/sqoop/import/test01目录中
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--where 'id > 2' \
--delete-target-dir\
--target-dir /sqoop/import/test01 \
--fields-terminated-by '\t' \
-m 1
- –where :用于指定行的过滤条件
文章图片
- 需求4:将tb_tohdfs表中的id>2的数据中id和name两列导入/sqoop/import/test01目录中
- 方案1
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--columns id,name \
--where 'id > 2' \
--delete-target-dir\
--target-dir /sqoop/import/test01 \
--fields-terminated-by '\t' \
-m 1
- 方案2
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
-e 'select id,name from tb_tohdfs where id>2 and $CONDITIONS' \
--delete-target-dir\
--target-dir /sqoop/import/test01 \
--fields-terminated-by '\t' \
-m 1
- -e,–query :使用SQL语句读取数据.只要使用SQL语句,必须在where子句中加上$CONDITIONS
文章图片
use default;
create table fromsqoop(
id int,
name string,
age int
);
- 1.直接导入
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--hive-import \
--hive-database default \
--hive-table fromsqoop \
--fields-terminated-by '\001' \
-m 1
- –hive-import \:表示导入Hive表
- 【大数据|大数据(Flume和Sqoop)】–hive-database default \:表示指定导入哪个Hive的数据库
- –hive-table fromsqoop \:表示指定导入哪个Hive的表
- –fields-terminated-by ‘\001’ \:指定Hive表的分隔符,一定要与Hive表的分隔符一致
文章图片
原理
- step1:将MySQL的数据通过MapReduce先导入HDFS
- step2:将HDFS上导入的这个文件通过load命令加载到了Hive表中
- 2.hcatalog导入
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--hcatalog-database default \
--hcatalog-table fromsqoop \
--fields-terminated-by '\001' \
-m 1
原理
- step1:先获取Hive表的元数据
- step2:将Hive表的目录直接作为MapReduce输出
- 第一天:产生数据
+----+--------+-----+
|1 | laoda|18 |
|2 | laoer|19 |
|3 | laosan |20 |
|4 | laosi|21 |
+----+--------+-----+
- 第二天的0点:采集昨天的数据
sqoop import --connect jdbc:mysql://node3:3306/sqoopTest --username root --password 123456 --table tb_tohdfs --target-dir /sqoop/import/test02 -m 1
- 第二天:产生新的数据
+----+--------+-----+
|5 | laowu|22 |
|6 | laoliu |23 |
|7 | laoqi|24 |
|8 | laoba|25 |
+----+--------+-----+
- 第三天:采集昨天的数据
sqoop import --connect jdbc:mysql://node3:3306/sqoopTest --username root --password 123456 --table tb_tohdfs --target-dir /sqoop/import/test02 -m 1
- 每次导入都是所有的数据,每次都是全量采集,会造成数据重复
设计:用于对某一列值进行判断,只要大于上一次的值就会被导入
参数
Incremental import arguments:
--check-column Source column to check for incremental
change
--incremental Define an incremental import of type
'append' or 'lastmodified'
--last-value Last imported value in the incremental
check column
- –check-column :按照哪一列进行增量导入
- –last-value:用于指定上一次的值
- –incremental:增量的方式
- append
- lastmodified
- append
- 要求:必须有一列自增的值,按照自增的int值进行判断
- 特点:只能导入新增的数据,无法导入更新的数据
- 测试
- 第一次采集
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--target-dir /sqoop/import/test02 \
--fields-terminated-by '\t' \
--check-column id \
--incremental append \
--last-value 1 \
-m 1
文章图片
- 插入新的数据
insert into tb_tohdfs values(null,"laowu",22);
insert into tb_tohdfs values(null,"laoliu",23);
insert into tb_tohdfs values(null,"laoqi",24);
insert into tb_tohdfs values(null,"laoba",25);
- 第二次采集
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--target-dir /sqoop/import/test02 \
--fields-terminated-by '\t' \
--incremental append \
--check-column id \
--last-value 4 \
-m 1
文章图片
2.lastmodifield
- 要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
- 特点:既导入新增的数据也导入更新的数据
- 测试
- MySQL中创建测试数据
CREATE TABLE `tb_lastmode` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(200) NOT NULL,
`lastmode` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMPON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into tb_lastmode values(null,'hadoop',null);
insert into tb_lastmode values(null,'spark',null);
insert into tb_lastmode values(null,'hbase',null);
- 第一次采集
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_lastmode \
--target-dir /sqoop/import/test03 \
--fields-terminated-by '\t' \
--incremental lastmodified \
--check-column lastmode \
--last-value '2021-05-12 21:55:30' \
-m 1
文章图片
- 数据发生变化
insert into tb_lastmode values(null,'hive',null);
update tb_lastmode set word = 'sqoop' where id = 1;
第二次采集
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_lastmode \
--target-dir /sqoop/import/test03 \
--fields-terminated-by '\t' \
--merge-key id \
--incremental lastmodified \
--check-column lastmode \
--last-value '2021-05-12 22:01:47' \
-m 1
- –merge-key :按照id进行合并
文章图片
特殊方式
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
-e 'select id,name from tb_tohdfs where id > 12 and $CONDITIONS' \
--delete-target-dir \
--target-dir /sqoop/import/test01 \
--fields-terminated-by '\t' \
-m 1
- 要求:必须每次将最新导入的数据放到一个目录单独存储,不能相同
- MySQL中创建测试表
use sqoopTest;
CREATE TABLE `tb_url` (
`id` int(11) NOT NULL,
`url` varchar(200) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Hive中创建表,并加载数据
vim /export/data/lateral.txt
1 http://facebook.com/path/p1.php?query=1
2 http://www.baidu.com/news/index.jsp?uuid=frank
3 http://www.jd.com/index?source=baiduuse default;
create table tb_url(
id int,
url string
) row format delimited fields terminated by '\t';
load data local inpath '/export/data/lateral.txt' into table tb_url;
文章图片
全量导出
sqoop export \
--connectjdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_url \
--export-dir /user/hive/warehouse/tb_url \
--input-fields-terminated-by '\t' \
-m 1
- –export-dir:指定导出的HDFS目录
- –input-fields-terminated-by :用于指定导出的HDFS文件的分隔符是什么
文章图片
- Hive中有一张结果表:存储每天分析的结果
--第一天:10号处理9号
iddaystrUVPVIP
12020-11-09100010000500insert into result
select id,daystr,uv,pv ,ip from datatable where daystr=昨天的日期
--第二天:11号处理10号
iddaystrUVPVIP
12020-11-09100010000500
22020-11-102000200001000
MySQL:存储每一天的结果
12020-11-09100010000500
增量导出方式
- updateonly:只增量导出更新的数据
- allowerinsert:既导出更新的数据,也导出新增的数据
- 修改lateral.txt数据
1 http://www.itcast.com/path/p1.php?query=1
2 http://www.baidu.com/news/index.jsp?uuid=frank
3 http://www.jd.com/index?source=baidu
4 http://www.heima.com
- 重新加载覆盖
load data local inpath '/export/data/lateral.txt' overwrite into table tb_url;
文章图片
- 增量导出
sqoop export \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_url \
--export-dir /user/hive/warehouse/tb_url \
--input-fields-terminated-by '\t' \
--update-key id \
--update-mode updateonly \
-m 1;
文章图片
2.allowerinsert
- 修改lateral.txt
1 http://bigdata.itcast.com/path/p1.php?query=1
2 http://www.baidu.com/news/index.jsp?uuid=frank
3 http://www.jd.com/index?source=baidu
4 http://www.heima.com
- 覆盖表中数据
load data local inpath '/export/data/lateral.txt' overwrite into table tb_url;
文章图片
- 增量导出
sqoop export \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_url \
--export-dir /user/hive/warehouse/tb_url \
--input-fields-terminated-by '\t' \
--update-key id \
--update-mode allowinsert \
-m 1
文章图片
十四、Sqoop Job
- 增量导入的问题
- 增量导入每次都要手动修改上次的值执行,怎么解决?
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--target-dir /sqoop/import/test04 \
--fields-terminated-by '\t' \
--incremental append \
--check-column id \
--last-value 4 \
-m 1
- Sqoop Job的使用
insert into tb_tohdfs values(null,'laojiu',26);
insert into tb_tohdfs values(null,'laoshi',27);
- 创建job
sqoop job --create job01 \
-- import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--target-dir /sqoop/import/test04 \
--fields-terminated-by '\t' \
--incremental append \
--check-column id \
--last-value 8 \
-m 1
- 创建job,不会运行程序,只是在元数据中记录信息
- 列举job
sqoop job --list
- 查看job的信息
sqoop job --show jobName
- 运行job
sqoop job --exec jobName
- 删除job
sqoop job --delete jobName
运行job01
sqoop job --exec job01
文章图片
插入新数据
insert into tb_tohdfs values(null,'laoshiyi',28);
insert into tb_tohdfs values(null,'laoshier',29);
运行job01
sqoop job --exec job01
文章图片
十五、Sqoop密码问题与脚本封装
- 如何解决手动输入密码和密码明文问题?
- 1:在sqoop的sqoop-site.xml中配置将密码存储在客户端中
- 2:将密码存储在文件中,通过文件的权限来管理密码
sqoop job --create job02 \
-- import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password-file file:///export/data/sqoop.passwd \
--table tb_tohdfs \
--target-dir /sqoop/import/test05 \
--fields-terminated-by '\t' \
--incremental append \
--check-column id \
--last-value 4 \
-m 1
- –password-file
- 读取的是HDFS文件,这个文件中只能有一行密码(通过notepad++编辑)
#mysql密码
123456
文章图片
Sqoop封装脚本
- 如何封装Sqoop的代码到文件中?
- step1:将代码封装到一个文件中
vim /export/data/test.sqoop
import
--connect
jdbc:mysql://node3:3306/sqoopTest
--username
root
--password-file
file:///export/data/sqoop.passwd
--table
tb_tohdfs
--target-dir
/sqoop/import/test05
--fields-terminated-by
'\t'
-m
1
- 要求:一行只放一个参数
- step2:运行这个文件
sqoop --options-file /export/data/test.sqoop
推荐阅读
- 网络|Kubernetes核心概念总结
- 深度学习|#今日论文推荐#思考总结10年,图灵奖得主Yann LeCun指明下一代AI方向(自主机器智能)
- 服务器|GBase 8a集群启停工具
- #|jpa、hibernate、spring-data-jpa关系
- 大数据|2021世界机器人大赛— 青少年机器人设计大赛
- 基于开源流批一体数据同步引擎ChunJun数据还原—DDL解析模块的实战分享
- JAVA|初步认识JAVA
- 数据库|围剿慢SQL,工行MySQL研发管控和治理实践
- 大数据|刚转行的运营人做哪些副业更简单,并且更赚钱()