flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive

目录

一、Flume是什么?+项目背景
1.Flume介绍
二、Flume基础架构
1、核心组件
(1)Agent
(2)Source
(3)channel
(4)Sink
(5)Event
2、工作流程
3、Flume的事务性
4、 Flume会丢失数据吗?
三、Flume安装+基本探索
1、从官网下载apache-flume-1.9.0-bin.tar.gz,放到服务器并解压至相关路径
2、 配置conf下的flume-env.sh
3、实现一个官网例子
(1)官网如下
(2)我们修改为自己的执行语句
(3)再另开一个窗口
(4)接着启动监控
(5)再接着启动一个界面,接着启动
四、开发Flume脚本
1、spooldir模式监控本地文件上传HDFS
(1)创建文件 spooldir-flume-hdfs.conf
(2)启动配置监控
(3)往我们的监控文件夹下cp一个文件
(4)查看我们的hdfs相应设置的目录
2、taildir模式监控本地文件上传HDFS
(1)创建文件 taildir-flume-hdfs.conf
(2)启动配置监控
(3)往我们的监控文件夹下cp一个文件
(4)查看相应HDFS
五、 编写Flume拦截器
1、flume自定义拦截器步骤
2.pom文件配置
3、编写拦截器代码
(1)代码如下:
(2)使用 package 打包
4、将生成的jar放入 Flume下的lib里
5、编辑taildir-hdfs.conf文件
6、开启文件监控
7、传一个文件试一下
8、查看下HDFS
9、添加对不同类型的更改 新建文件 taildir-hdfs-all.conf
10、启动脚本监控
11、测试各类型是否可以正常加载
(1)页面类型
(2)action类型
(3)login 类型
(4)exposure 类型
12、编写一个flume 启停脚本 如下
13、测试一下脚本
六、将数据加载到hive表
1、编写加载到hive的project库下表的脚本
2、执行脚本
3、查看 hive表中数据

一、Flume是什么?+项目背景 1.Flume介绍 Flume 基于流式架构是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
本项目 Flume 实时读取服务器本地目录下生成的埋点数据,将数据实时写入到HDFS。
flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

有的公司涉及几十甚至上百的的web服务器
操作流程可能如下:
flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片


二、Flume基础架构 flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

1、核心组件 (1)Agent
Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目标地。
主要有 3 个部分组成,Source、Channel、Sink。
(2)Source
Source 是负责接收数据到 Flume Agent 的组件。
主要包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。
(3)channel
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。
注:一个channel可对应多个sink,但是一个sink只能对应一个channel。
Flume 自带两种 Channel:Memory Channel 和 File Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
(4)Sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr。
(5)Event
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。
Event 由 Headers 和 Body 两部分组成,Headers 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组
flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片


2、工作流程flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片


(1)数据通过Source采集进入Flume,Flume以通过Agent以事件的形式将数据从源头到目的地。
(2)进入事件处理,Event为传输单元,由可选的header和载有数据的body(byte array)构成。
(3)在agent可对数据进行粗步拦截,排除某些不采集的文件,文件类型。
(4)Channel选择器,两种Channel Selector,一种是Replicating channel另一种是Multiplexing Channel,前者将source的event发往左右的channel,适用于数据多副本存储,后者将event发往指定的channel,适用于数据定向发送。
(5)将event写入对应channel列表
(6)SinkProcessor从Channel中拉数据
分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor
DefaultSinkProcessor 对 应 的 是 单 个 的 Sink , LoadBalancingSinkProcessor 和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能
(7)最后把数据Sink出去

3、Flume的事务性 flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

【flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive】Put事务:Source到Channel doPut:将数据从souce写入临时缓冲区putList doCommit:检查Channel内存队列是否足够合并 doRollback:channel 内存队列空间不足,则回滚数据
take事务:channel到sink doTake:将数据取到临时缓冲区takeList doCommit:如果数据全部发送成功,则清除临时缓冲区takeList doRollback:数据发送过程中如果出现异常,rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列

4、 Flume会丢失数据吗? Source到Channel是事务性的,
Channel到Sink也是事务性的,
这两个环节都不可能丢失数据。
唯一可能丢失数据的是Channel采用MemoryChannel,
(1)在agent宕机时候导致数据在内存中丢失;
(2)Channel存储数据已满,导致Source不再写入数据,造成未写入的数据丢失;
具体分析:
flume传输是否会丢失或重复数据? ? 这个问题需要分情况来看,需要结合具体使用的source、channel和sink来分析。
首先,分析source:
(1)exec source ,后面接 tail -f ,这个数据也是有可能丢的。
(2)TailDir source ,这个是不会丢数据的,它可以保证数据不丢失。
其次,分析sink:
(1)hdfs/kakfa sink,数据有可能重复,但是不会丢失。
一般生产过程中,都是使用 TailDir source 和 HDFS sink,所以数据会重复但是不会丢失。
最后,分析channel 要想数据不丢失的话,还是要用 File channel,而memory channel 在flume挂掉的时候还是有可能造成数据的丢失的。

三、Flume安装+基本探索 1、从官网下载apache-flume-1.9.0-bin.tar.gz,放到服务器并解压至相关路径 flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

2、 配置conf下的flume-env.sh 将java路径添加进去
flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

3、实现一个官网例子 (1)官网如下
flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片



bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template


(2)我们修改为自己的执行语句
bin/flume-ng agent -n peizk -c conf -f conf/flume-conf.properties.template

在服务器执行
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent -n peizk -c conf -f conf/flume-conf.properties.template Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access + exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application -n peizk -f conf/flume-conf.properties.template SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

(3)再另开一个窗口
在flume目录下新建一个文件
[peizk@hadoop flume-1.9.0]$ vim example.conf

将我们官网代码放入
# example.conf: A single-node Flume configuration# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444# Describe the sink a1.sinks.k1.type = logger# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

(4)接着启动监控
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console


--conf/-c:表示配置文件存储在 conf/目录
--name/-n:表示给 agent 起名为 a1
--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf
文件。-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger
参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error

(5)再接着启动一个界面,接着启动
[peizk@hadoop flume-1.9.0]$ telnet localhost 44444 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'.

写一些数据进去
[peizk@hadoop flume-1.9.0]$ telnet localhost 44444 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello peizk OK

查看 第二个窗口
flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

完成官网例子,通过flume监控4444端口,往里存什么,拿什么

四、开发Flume脚本 1、spooldir模式监控本地文件上传HDFS (1)创建文件 spooldir-flume-hdfs.conf
内容如下:
a2.sources = r2 a2.sinks = k2 a2.channels = c2 #配置source a2.sources.r2.type = spooldir a2.sources.r2.spoolDir = /home/peizk/app/flume-1.9.0/test_log a2.sources.r2.fileSuffix = .COMPLETED a2.sources.r2.fileHeader = true #忽略所有以.tmp 结尾的文件,不上传 a2.sources.r2.ignorePattern = ([^ ]*\.tmp) #配置sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop:9000/user/flume/spooldir/%Y-%m-%d/ #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = app- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = https://www.it610.com/article/1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k2.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a2.sinks.k2.hdfs.rollCount = 0 #配置channel a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 #配置关联关系 a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2

(2)启动配置监控
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/spooldir-flume-hdfs.conf Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access + exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a2 --conf-file job/spooldir-flume-hdfs.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

(3)往我们的监控文件夹下cp一个文件
[peizk@hadoop test_log]$ cp ../logs/flume.log./

(4)查看我们的hdfs相应设置的目录
flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

成功!

2、taildir模式监控本地文件上传HDFS (1)创建文件 taildir-flume-hdfs.conf
内容如下:
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # 配置source a3.sources.r3.type = TAILDIR a3.sources.r3.positionFile = /home/peizk/app/flume-1.9.0/taildir.json a3.sources.r3.filegroups = f1 a3.sources.r3.filegroups.f1 = /home/peizk/app/flume-1.9.0/test_log/.*page.* a3.sources.r3.ignorePattern = ([^ ]*\.tmp) # 配置sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop:9000/user/flume/app-page/%Y-%m-%d #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = page- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = https://www.it610.com/article/1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k3.hdfs.rollCount = 0 #配置channel #channe类型 a3.channels.c3.type = memory #储存在channel中最大event数量 a3.channels.c3.capacity = 1000 #从channel中去给一个sink,每个事务中最大的事件数 a3.channels.c3.transactionCapacity = 100 #配置关联关系 a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3

(2)启动配置监控
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-flume-hdfs.conf Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access + exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a3 --conf-file job/taildir-flume-hdfs.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

(3)往我们的监控文件夹下cp一个文件
[peizk@hadoop test_log]$ cp ~/testpage.log ./


(4)查看相应HDFS
flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片


五、 编写Flume拦截器 1、flume自定义拦截器步骤
  • 1)继承接口:org.apache.flume.interceptor.Interceptor
  • 2)实现接口中的4个抽象方法:初始化 initialize()、单个事件拦截 intercept(Event event)、批量事件拦截 intercept(List events)、关闭io流 close()
  • 3)创建一个静态内部类Builder,并实现接口implements Interceptor.Builder。我们自定义的拦截器这个类,没有办法直接new,而是在flume的配置文件中进行配置,通过配置文件调用静态内部类,来间接地调用自定义的拦截器对象。
拦截器是拦截一条一条的事件、启动时会初始化,会存在数据流,initialize开启数据流、close关闭数据流,event包括header+body

2.pom文件配置
4.0.0org.example Flume 1.0org.apache.maven.plugins maven-compiler-plugin 6 6 maven-compiler-plugin 2.3.2 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assemblypackage single org.apache.flume flume-ng-core 1.9.0 provided com.alibaba fastjson 1.2.4


3、编写拦截器代码 (1)代码如下:
package sm.flume; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.List; import java.util.Map; public class LogInterceptor implements Interceptor {public void initialize() {}public Event intercept(Event event) { byte[] body = event.getBody(); String s = new String(body, Charset.forName("utf-8")); JSONObject jsonObject = JSONObject.parseObject(s); String timestamp = jsonObject.getString("ts"); String type = jsonObject.getString("type"); Map headers = event.getHeaders(); headers.put("timestamp",timestamp); headers.put("type",type); return event; }public List intercept(List events) { for (Event event : events) { intercept(event); }return events; }public void close() {} publicstaticclass Builder implements Interceptor.Builder{@Override public Interceptor build() { return new LogInterceptor(); }@Override public void configure(Context context) {} } }

(2)使用 package 打包
flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片


4、将生成的jar放入 Flume下的lib里 flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片


5、编辑taildir-hdfs.conf文件
[peizk@hadoop job]$ vim taildir-hdfs.conf

内容为:
a4.sources = r1 a4.channels = c1 a4.sinks = k1 # 配置source a4.sources.r1.type = TAILDIR a4.sources.r1.positionFile = /home/peizk/app/flume-1.9.0/taildir_hdfs.json a4.sources.r1.filegroups = f1 a4.sources.r1.filegroups.f1 = /home/peizk/app/flume-1.9.0/test_log/app.* a4.sources.r1.ignorePattern = ([^ ]*\.tmp) a4.sources.r1.interceptors = i1 a4.sources.r1.interceptors.i1.type = sm.flume.LogInterceptor$Builder# 配置sink a4.sinks.k1.type = hdfs a4.sinks.k1.hdfs.path = hdfs://hadoop:9000/user/flume/app-page/%Y-%m-%d #上传文件的前缀 a4.sinks.k1.hdfs.filePrefix = page- #是否按照时间滚动文件夹 a4.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a4.sinks.k1.hdfs.roundValue = https://www.it610.com/article/1 #重新定义时间单位 a4.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 #a4.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a4.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a4.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a4.sinks.k1.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a4.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a4.sinks.k1.hdfs.rollCount = 0#配置channel a4.channels.c1.type = memory a4.channels.c1.capacity = 1000 a4.channels.c1.transactionCapacity = 100#配置关联关系 a4.sources.r1.channels = c1 a4.sinks.k1.channel = c1

6、开启文件监控
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a4 --conf-file job/taildir-hdfs.conf Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access + exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a4 --conf-file job/taildir-hdfs.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

7、传一个文件试一下
[peizk@hadoop test_log]$ cp ../log_data/page_data/app_page_log_20220101.log ./

8、查看下HDFS flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片


时间戳已改变,成功!

9、添加对不同类型的更改 新建文件 taildir-hdfs-all.conf
[peizk@hadoop job]$ vim taildir-hdfs-all.conf

内容如下:
a4.sources = r1 a4.channels = c1 c2 c3 c4 a4.sinks = k1 k2 k3 k4 # 配置source a4.sources.r1.type = TAILDIR a4.sources.r1.positionFile = /home/peizk/app/flume-1.9.0/taildir_hdfs_all.json a4.sources.r1.filegroups = f1 a4.sources.r1.filegroups.f1 = /home/peizk/app/flume-1.9.0/test_log/app.* a4.sources.r1.ignorePattern = ([^ ]*\.tmp) a4.sources.r1.interceptors = i1 a4.sources.r1.interceptors.i1.type = sm.flume.LogInterceptor$Builder a4.sources.r1.selector.type = multiplexing a4.sources.r1.selector.header = type a4.sources.r1.selector.mapping.page = c1 a4.sources.r1.selector.mapping.exposure = c2 a4.sources.r1.selector.mapping.actions = c3 a4.sources.r1.selector.mapping.login = c4 # 配置sink a4.sinks.k1.type = hdfs a4.sinks.k1.hdfs.path = hdfs://hadoop:9000/user/flume/app-page/%Y-%m-%d #上传文件的前缀 a4.sinks.k1.hdfs.filePrefix = page- #是否按照时间滚动文件夹 a4.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a4.sinks.k1.hdfs.roundValue = https://www.it610.com/article/1 #重新定义时间单位 a4.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 #a4.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a4.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a4.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a4.sinks.k1.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a4.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a4.sinks.k1.hdfs.rollCount = 0a4.sinks.k2.type = hdfs a4.sinks.k2.hdfs.path = hdfs://hadoop:9000/user/flume/app-exposure/%Y-%m-%d #上传文件的前缀 a4.sinks.k2.hdfs.filePrefix = exposure- #是否按照时间滚动文件夹 a4.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a4.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a4.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 #a4.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a4.sinks.k2.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a4.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a4.sinks.k2.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a4.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a4.sinks.k2.hdfs.rollCount = 0a4.sinks.k3.type = hdfs a4.sinks.k3.hdfs.path = hdfs://hadoop:9000/user/flume/app-actions/%Y-%m-%d #上传文件的前缀 a4.sinks.k3.hdfs.filePrefix = actions- #是否按照时间滚动文件夹 a4.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a4.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a4.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 #a4.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a4.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a4.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a4.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a4.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a4.sinks.k3.hdfs.rollCount = 0a4.sinks.k4.type = hdfs a4.sinks.k4.hdfs.path = hdfs://hadoop:9000/user/flume/app-login/%Y-%m-%d #上传文件的前缀 a4.sinks.k4.hdfs.filePrefix = login- #是否按照时间滚动文件夹 a4.sinks.k4.hdfs.round = true #多少时间单位创建一个新的文件夹 a4.sinks.k4.hdfs.roundValue = 1 #重新定义时间单位 a4.sinks.k4.hdfs.roundUnit = hour #是否使用本地时间戳 #a4.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a4.sinks.k4.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a4.sinks.k4.hdfs.fileType = DataStream #多久生成一个新的文件 a4.sinks.k4.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a4.sinks.k4.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a4.sinks.k4.hdfs.rollCount = 0 #配置channel a4.channels.c1.type = memory a4.channels.c1.capacity = 1000 a4.channels.c1.transactionCapacity = 100a4.channels.c2.type = memory a4.channels.c2.capacity = 1000 a4.channels.c2.transactionCapacity = 100a4.channels.c3.type = memory a4.channels.c3.capacity = 1000 a4.channels.c3.transactionCapacity = 100a4.channels.c4.type = memory a4.channels.c4.capacity = 1000 a4.channels.c4.transactionCapacity = 100 #配置关联关系 a4.sources.r1.channels = c1 c2 c3 c4 a4.sinks.k1.channel = c1 a4.sinks.k2.channel = c2 a4.sinks.k3.channel = c3 a4.sinks.k4.channel = c4


10、启动脚本监控
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a4 --conf-file job/taildir-hdfs-all.conf Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access + exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a4 --conf-file job/taildir-hdfs-all.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

11、测试各类型是否可以正常加载 (1)页面类型
[peizk@hadoop test_log]$ cp ../log_data/page_data/app_page_log_20220101.log ./

flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

(2)action类型
[peizk@hadoop test_log]$ cp ../log_data/actions_data/app_actions_log_20220101.log ./

flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片


(3)login 类型
[peizk@hadoop test_log]$ cp ../log_data/login_data/app_login_log_20220101.log./

flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片


(4)exposure 类型
[peizk@hadoop test_log]$ cp ../log_data/exposure_data/app_exposure_log_20220101.log./

flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

12、编写一个flume 启停脚本 如下
[peizk@hadoop bin]$ vim flum.sh

#! /bin/bashcase $1 in "start"){ for i in hadoop do echo " --------启动 $i 采集flume-------" ssh $i "nohup /home/peizk/app/flume-1.9.0/bin/flume-ng agent --conf-file /home/peizk/app/flume-1.9.0/job/taildir-hdfs-all.conf --name a3 -Dflume.root.logger=INFO,LOGFILE2>&1&" done }; ; "stop"){ for i in hadoop do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep taildir-hdfs-all.conf | grep -v grep |awk'{print \$2}' | xargs -n1 kill -9 " done}; ; esac

13、测试一下脚本
六、将数据加载到hive表 1、编写加载到hive的project库下表的脚本 内容如下:
#!/bin/bash # 如果传入日期则etl_date等于传入的日期,否则赋值前一天日期 if [ -n "$2" ] ; then etl_date=$2 else etl_date=`date -d "-1 day" +%F` fi case $1 in "page") echo "------------开始加载$1表${etl_date}分区数据----------------" hive-e"load data inpath '/user/flume/app-page/${etl_date}/*'into table project.ods_log_page_incr partition(pt='${etl_date}')" ; ; "actions") echo "------------开始加载$1表${etl_date}分区数据----------------" hive-e"load data inpath '/user/flume/app-actions/${etl_date}/*'into table project.ods_log_actions_incr partition(pt='${etl_date}')" ; ; "login") echo "------------开始加载$1表${etl_date}分区数据----------------" hive-e"load data inpath '/user/flume/app-login/${etl_date}/*'into table project.ods_log_login_incr partition(pt='${etl_date}')" ; ; "exposure") echo "------------开始加载$1表${etl_date}分区数据----------------" hive-e"load data inpath '/user/flume/app-exposure/${etl_date}/*'into table project.ods_log_exposure_incr partition(pt='${etl_date}')" ; ; "all") echo "------------开始加载全部日志表${etl_date}分区数据----------------" hive-e"load data inpath '/user/flume/app-page/${etl_date}/*'into table project.ods_log_page_incr partition(pt='${etl_date}')"hive-e"load data inpath '/user/flume/app-actions/${etl_date}/*'into table project.ods_log_actions_incr partition(pt='${etl_date}')"hive-e"load data inpath '/user/flume/app-login/${etl_date}/*'into table project.ods_log_login_incr partition(pt='${etl_date}')"hive-e"load data inpath '/user/flume/app-exposure/${etl_date}/*'into table project.ods_log_exposure_incr partition(pt='${etl_date}')"esac

2、执行脚本
[peizk@hadoop job]$ sh load_hive.sh all 2022-01-01 ------------开始加载全部日志表2022-01-01分区数据---------------- which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Hive Session ID = 0d0cd9d8-c692-49d0-856b-2b3b12d0969fLogging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true Hive Session ID = 76e99fe1-f958-4264-b171-d7732719b551 Loading data to table project.ods_log_page_incr partition (pt=2022-01-01) OK Time taken: 1.962 seconds which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Hive Session ID = 2e8f8319-c819-4d0e-85d6-44bcd913fee5Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true Hive Session ID = 81624ba6-94cd-43ae-8938-1c9ac040bb70 Loading data to table project.ods_log_actions_incr partition (pt=2022-01-01) OK Time taken: 1.936 seconds which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Hive Session ID = 898ac37e-0919-4d31-9870-574d5c5fdf00Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true Hive Session ID = 43e6b9e3-4cb7-4590-a2e2-0e23731f28e8 Loading data to table project.ods_log_login_incr partition (pt=2022-01-01) OK Time taken: 1.952 seconds which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Hive Session ID = 810102d2-3d20-4bcd-a0c2-34006c2b7ea2Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true Hive Session ID = 533d6173-a204-49c1-a654-a6d3ccd23c66 Loading data to table project.ods_log_exposure_incr partition (pt=2022-01-01) OK Time taken: 1.927 seconds

3、查看 hive表中数据 flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive
文章图片

导入成功!!!

    推荐阅读