目录
一、Flume是什么?+项目背景
1.Flume介绍
二、Flume基础架构
1、核心组件
(1)Agent
(2)Source
(3)channel
(4)Sink
(5)Event
2、工作流程
3、Flume的事务性
4、 Flume会丢失数据吗?
三、Flume安装+基本探索
1、从官网下载apache-flume-1.9.0-bin.tar.gz,放到服务器并解压至相关路径
2、 配置conf下的flume-env.sh
3、实现一个官网例子
(1)官网如下
(2)我们修改为自己的执行语句
(3)再另开一个窗口
(4)接着启动监控
(5)再接着启动一个界面,接着启动
四、开发Flume脚本
1、spooldir模式监控本地文件上传HDFS
(1)创建文件 spooldir-flume-hdfs.conf
(2)启动配置监控
(3)往我们的监控文件夹下cp一个文件
(4)查看我们的hdfs相应设置的目录
2、taildir模式监控本地文件上传HDFS
(1)创建文件 taildir-flume-hdfs.conf
(2)启动配置监控
(3)往我们的监控文件夹下cp一个文件
(4)查看相应HDFS
五、 编写Flume拦截器
1、flume自定义拦截器步骤
2.pom文件配置
3、编写拦截器代码
(1)代码如下:
(2)使用 package 打包
4、将生成的jar放入 Flume下的lib里
5、编辑taildir-hdfs.conf文件
6、开启文件监控
7、传一个文件试一下
8、查看下HDFS
9、添加对不同类型的更改 新建文件 taildir-hdfs-all.conf
10、启动脚本监控
11、测试各类型是否可以正常加载
(1)页面类型
(2)action类型
(3)login 类型
(4)exposure 类型
12、编写一个flume 启停脚本 如下
13、测试一下脚本
六、将数据加载到hive表
1、编写加载到hive的project库下表的脚本
2、执行脚本
3、查看 hive表中数据
一、Flume是什么?+项目背景
1.Flume介绍 Flume 基于流式架构是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
本项目 Flume 实时读取服务器本地目录下生成的埋点数据,将数据实时写入到HDFS。
文章图片
有的公司涉及几十甚至上百的的web服务器
操作流程可能如下:
文章图片
二、Flume基础架构
文章图片
1、核心组件
(1)Agent
Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目标地。
主要有 3 个部分组成,Source、Channel、Sink。
(2)Source
Source 是负责接收数据到 Flume Agent 的组件。
主要包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。
(3)channel
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。
注:一个channel可对应多个sink,但是一个sink只能对应一个channel。
Flume 自带两种 Channel:Memory Channel 和 File Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
(4)Sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr。
(5)Event
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。
Event 由 Headers 和 Body 两部分组成,Headers 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组
文章图片
2、工作流程
文章图片
(1)数据通过Source采集进入Flume,Flume以通过Agent以事件的形式将数据从源头到目的地。
(2)进入事件处理,Event为传输单元,由可选的header和载有数据的body(byte array)构成。
(3)在agent可对数据进行粗步拦截,排除某些不采集的文件,文件类型。
(4)Channel选择器,两种Channel Selector,一种是Replicating channel另一种是Multiplexing Channel,前者将source的event发往左右的channel,适用于数据多副本存储,后者将event发往指定的channel,适用于数据定向发送。
(5)将event写入对应channel列表
(6)SinkProcessor从Channel中拉数据
分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor
DefaultSinkProcessor 对 应 的 是 单 个 的 Sink , LoadBalancingSinkProcessor 和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能
(7)最后把数据Sink出去
3、Flume的事务性
文章图片
【flume|Flume介绍、基础架构+Flume安装+Flume开发脚本+编写Flume拦截器+埋点数据装载到Hive】Put事务:Source到Channel doPut:将数据从souce写入临时缓冲区putList doCommit:检查Channel内存队列是否足够合并 doRollback:channel 内存队列空间不足,则回滚数据
take事务:channel到sink doTake:将数据取到临时缓冲区takeList doCommit:如果数据全部发送成功,则清除临时缓冲区takeList doRollback:数据发送过程中如果出现异常,rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列
4、 Flume会丢失数据吗? Source到Channel是事务性的,
Channel到Sink也是事务性的,
这两个环节都不可能丢失数据。
唯一可能丢失数据的是Channel采用MemoryChannel,
(1)在agent宕机时候导致数据在内存中丢失;
(2)Channel存储数据已满,导致Source不再写入数据,造成未写入的数据丢失;
具体分析:
flume传输是否会丢失或重复数据? ? 这个问题需要分情况来看,需要结合具体使用的source、channel和sink来分析。
首先,分析source:
(1)exec source ,后面接 tail -f ,这个数据也是有可能丢的。
(2)TailDir source ,这个是不会丢数据的,它可以保证数据不丢失。
其次,分析sink:
(1)hdfs/kakfa sink,数据有可能重复,但是不会丢失。
一般生产过程中,都是使用 TailDir source 和 HDFS sink,所以数据会重复但是不会丢失。
最后,分析channel 要想数据不丢失的话,还是要用 File channel,而memory channel 在flume挂掉的时候还是有可能造成数据的丢失的。
三、Flume安装+基本探索
1、从官网下载apache-flume-1.9.0-bin.tar.gz,放到服务器并解压至相关路径
文章图片
2、 配置conf下的flume-env.sh 将java路径添加进去
文章图片
3、实现一个官网例子
(1)官网如下
文章图片
bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
(2)我们修改为自己的执行语句
bin/flume-ng agent -n peizk -c conf -f conf/flume-conf.properties.template
在服务器执行
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent -n peizk -c conf -f conf/flume-conf.properties.template
Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access
+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application -n peizk -f conf/flume-conf.properties.template
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(3)再另开一个窗口
在flume目录下新建一个文件
[peizk@hadoop flume-1.9.0]$ vim example.conf
将我们官网代码放入
# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(4)接着启动监控
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
--conf/-c:表示配置文件存储在 conf/目录
--name/-n:表示给 agent 起名为 a1
--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf
文件。-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger
参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error
(5)再接着启动一个界面,接着启动
[peizk@hadoop flume-1.9.0]$ telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
写一些数据进去
[peizk@hadoop flume-1.9.0]$ telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello peizk
OK
查看 第二个窗口
文章图片
完成官网例子,通过flume监控4444端口,往里存什么,拿什么
四、开发Flume脚本 1、spooldir模式监控本地文件上传HDFS (1)创建文件 spooldir-flume-hdfs.conf
内容如下:
a2.sources = r2
a2.sinks = k2
a2.channels = c2
#配置source
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /home/peizk/app/flume-1.9.0/test_log
a2.sources.r2.fileSuffix = .COMPLETED
a2.sources.r2.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a2.sources.r2.ignorePattern = ([^ ]*\.tmp)
#配置sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop:9000/user/flume/spooldir/%Y-%m-%d/
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = app-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = https://www.it610.com/article/1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
#配置channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
#配置关联关系
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
(2)启动配置监控
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/spooldir-flume-hdfs.conf
Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access
+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a2 --conf-file job/spooldir-flume-hdfs.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(3)往我们的监控文件夹下cp一个文件
[peizk@hadoop test_log]$ cp ../logs/flume.log./
(4)查看我们的hdfs相应设置的目录
文章图片
成功!
2、taildir模式监控本地文件上传HDFS (1)创建文件 taildir-flume-hdfs.conf
内容如下:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# 配置source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /home/peizk/app/flume-1.9.0/taildir.json
a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /home/peizk/app/flume-1.9.0/test_log/.*page.*
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# 配置sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop:9000/user/flume/app-page/%Y-%m-%d
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = page-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = https://www.it610.com/article/1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
#配置channel
#channe类型
a3.channels.c3.type = memory
#储存在channel中最大event数量
a3.channels.c3.capacity = 1000
#从channel中去给一个sink,每个事务中最大的事件数
a3.channels.c3.transactionCapacity = 100
#配置关联关系
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动配置监控
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-flume-hdfs.conf
Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access
+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a3 --conf-file job/taildir-flume-hdfs.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(3)往我们的监控文件夹下cp一个文件
[peizk@hadoop test_log]$ cp ~/testpage.log ./
(4)查看相应HDFS
文章图片
五、 编写Flume拦截器 1、flume自定义拦截器步骤
- 1)继承接口:org.apache.flume.interceptor.Interceptor
- 2)实现接口中的4个抽象方法:
初始化 initialize()、单个事件拦截 intercept(Event event)、批量事件拦截 intercept(List
events)、关闭io流 close()
- 3)创建一个静态内部类Builder,并实现接口
implements Interceptor.Builder
。我们自定义的拦截器这个类,没有办法直接new,而是在flume的配置文件中进行配置,通过配置文件调用静态内部类,来间接地调用自定义的拦截器对象。
2.pom文件配置
4.0.0 org.example
Flume
1.0 org.apache.maven.plugins
maven-compiler-plugin
6
6
maven-compiler-plugin
2.3.2
1.8
1.8
maven-assembly-plugin
jar-with-dependencies
make-assembly package
single
org.apache.flume
flume-ng-core
1.9.0
provided
com.alibaba
fastjson
1.2.4
3、编写拦截器代码 (1)代码如下:
package sm.flume;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
public class LogInterceptor implements Interceptor {public void initialize() {}public Event intercept(Event event) {
byte[] body = event.getBody();
String s = new String(body, Charset.forName("utf-8"));
JSONObject jsonObject = JSONObject.parseObject(s);
String timestamp = jsonObject.getString("ts");
String type = jsonObject.getString("type");
Map headers = event.getHeaders();
headers.put("timestamp",timestamp);
headers.put("type",type);
return event;
}public List intercept(List events) {
for (Event event : events) {
intercept(event);
}return events;
}public void close() {}
publicstaticclass Builder implements Interceptor.Builder{@Override
public Interceptor build() {
return new LogInterceptor();
}@Override
public void configure(Context context) {}
}
}
(2)使用 package 打包
文章图片
4、将生成的jar放入 Flume下的lib里
文章图片
5、编辑taildir-hdfs.conf文件
[peizk@hadoop job]$ vim taildir-hdfs.conf
内容为:
a4.sources = r1
a4.channels = c1
a4.sinks = k1
# 配置source
a4.sources.r1.type = TAILDIR
a4.sources.r1.positionFile = /home/peizk/app/flume-1.9.0/taildir_hdfs.json
a4.sources.r1.filegroups = f1
a4.sources.r1.filegroups.f1 = /home/peizk/app/flume-1.9.0/test_log/app.*
a4.sources.r1.ignorePattern = ([^ ]*\.tmp)
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = sm.flume.LogInterceptor$Builder# 配置sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://hadoop:9000/user/flume/app-page/%Y-%m-%d
#上传文件的前缀
a4.sinks.k1.hdfs.filePrefix = page-
#是否按照时间滚动文件夹
a4.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a4.sinks.k1.hdfs.roundValue = https://www.it610.com/article/1
#重新定义时间单位
a4.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
#a4.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a4.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a4.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a4.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a4.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a4.sinks.k1.hdfs.rollCount = 0#配置channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100#配置关联关系
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
6、开启文件监控
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a4 --conf-file job/taildir-hdfs.conf
Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access
+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a4 --conf-file job/taildir-hdfs.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
7、传一个文件试一下
[peizk@hadoop test_log]$ cp ../log_data/page_data/app_page_log_20220101.log ./
8、查看下HDFS
文章图片
时间戳已改变,成功!
9、添加对不同类型的更改 新建文件 taildir-hdfs-all.conf
[peizk@hadoop job]$ vim taildir-hdfs-all.conf
内容如下:
a4.sources = r1
a4.channels = c1 c2 c3 c4
a4.sinks = k1 k2 k3 k4
# 配置source
a4.sources.r1.type = TAILDIR
a4.sources.r1.positionFile = /home/peizk/app/flume-1.9.0/taildir_hdfs_all.json
a4.sources.r1.filegroups = f1
a4.sources.r1.filegroups.f1 = /home/peizk/app/flume-1.9.0/test_log/app.*
a4.sources.r1.ignorePattern = ([^ ]*\.tmp)
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = sm.flume.LogInterceptor$Builder
a4.sources.r1.selector.type = multiplexing
a4.sources.r1.selector.header = type
a4.sources.r1.selector.mapping.page = c1
a4.sources.r1.selector.mapping.exposure = c2
a4.sources.r1.selector.mapping.actions = c3
a4.sources.r1.selector.mapping.login = c4
# 配置sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://hadoop:9000/user/flume/app-page/%Y-%m-%d
#上传文件的前缀
a4.sinks.k1.hdfs.filePrefix = page-
#是否按照时间滚动文件夹
a4.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a4.sinks.k1.hdfs.roundValue = https://www.it610.com/article/1
#重新定义时间单位
a4.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
#a4.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a4.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a4.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a4.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a4.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a4.sinks.k1.hdfs.rollCount = 0a4.sinks.k2.type = hdfs
a4.sinks.k2.hdfs.path = hdfs://hadoop:9000/user/flume/app-exposure/%Y-%m-%d
#上传文件的前缀
a4.sinks.k2.hdfs.filePrefix = exposure-
#是否按照时间滚动文件夹
a4.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a4.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a4.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
#a4.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a4.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a4.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a4.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a4.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a4.sinks.k2.hdfs.rollCount = 0a4.sinks.k3.type = hdfs
a4.sinks.k3.hdfs.path = hdfs://hadoop:9000/user/flume/app-actions/%Y-%m-%d
#上传文件的前缀
a4.sinks.k3.hdfs.filePrefix = actions-
#是否按照时间滚动文件夹
a4.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a4.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a4.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
#a4.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a4.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a4.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a4.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a4.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a4.sinks.k3.hdfs.rollCount = 0a4.sinks.k4.type = hdfs
a4.sinks.k4.hdfs.path = hdfs://hadoop:9000/user/flume/app-login/%Y-%m-%d
#上传文件的前缀
a4.sinks.k4.hdfs.filePrefix = login-
#是否按照时间滚动文件夹
a4.sinks.k4.hdfs.round = true
#多少时间单位创建一个新的文件夹
a4.sinks.k4.hdfs.roundValue = 1
#重新定义时间单位
a4.sinks.k4.hdfs.roundUnit = hour
#是否使用本地时间戳
#a4.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a4.sinks.k4.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a4.sinks.k4.hdfs.fileType = DataStream
#多久生成一个新的文件
a4.sinks.k4.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a4.sinks.k4.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a4.sinks.k4.hdfs.rollCount = 0
#配置channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100a4.channels.c2.type = memory
a4.channels.c2.capacity = 1000
a4.channels.c2.transactionCapacity = 100a4.channels.c3.type = memory
a4.channels.c3.capacity = 1000
a4.channels.c3.transactionCapacity = 100a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100
#配置关联关系
a4.sources.r1.channels = c1 c2 c3 c4
a4.sinks.k1.channel = c1
a4.sinks.k2.channel = c2
a4.sinks.k3.channel = c3
a4.sinks.k4.channel = c4
10、启动脚本监控
[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a4 --conf-file job/taildir-hdfs-all.conf
Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access
+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a4 --conf-file job/taildir-hdfs-all.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
11、测试各类型是否可以正常加载 (1)页面类型
[peizk@hadoop test_log]$ cp ../log_data/page_data/app_page_log_20220101.log ./
文章图片
(2)action类型
[peizk@hadoop test_log]$ cp ../log_data/actions_data/app_actions_log_20220101.log ./
文章图片
(3)login 类型
[peizk@hadoop test_log]$ cp ../log_data/login_data/app_login_log_20220101.log./
文章图片
(4)exposure 类型
[peizk@hadoop test_log]$ cp ../log_data/exposure_data/app_exposure_log_20220101.log./
文章图片
12、编写一个flume 启停脚本 如下
[peizk@hadoop bin]$ vim flum.sh
#! /bin/bashcase $1 in
"start"){
for i in hadoop
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /home/peizk/app/flume-1.9.0/bin/flume-ng agent --conf-file /home/peizk/app/flume-1.9.0/job/taildir-hdfs-all.conf --name a3 -Dflume.root.logger=INFO,LOGFILE2>&1&"
done
};
;
"stop"){
for i in hadoop
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep taildir-hdfs-all.conf | grep -v grep |awk'{print \$2}' | xargs -n1 kill -9 "
done};
;
esac
13、测试一下脚本
六、将数据加载到hive表 1、编写加载到hive的project库下表的脚本 内容如下:
#!/bin/bash
# 如果传入日期则etl_date等于传入的日期,否则赋值前一天日期
if [ -n "$2" ] ;
then
etl_date=$2
else
etl_date=`date -d "-1 day" +%F`
fi case $1 in
"page")
echo "------------开始加载$1表${etl_date}分区数据----------------"
hive-e"load data inpath '/user/flume/app-page/${etl_date}/*'into table project.ods_log_page_incr partition(pt='${etl_date}')"
;
;
"actions")
echo "------------开始加载$1表${etl_date}分区数据----------------"
hive-e"load data inpath '/user/flume/app-actions/${etl_date}/*'into table project.ods_log_actions_incr partition(pt='${etl_date}')"
;
;
"login")
echo "------------开始加载$1表${etl_date}分区数据----------------"
hive-e"load data inpath '/user/flume/app-login/${etl_date}/*'into table project.ods_log_login_incr partition(pt='${etl_date}')"
;
;
"exposure")
echo "------------开始加载$1表${etl_date}分区数据----------------"
hive-e"load data inpath '/user/flume/app-exposure/${etl_date}/*'into table project.ods_log_exposure_incr partition(pt='${etl_date}')"
;
;
"all")
echo "------------开始加载全部日志表${etl_date}分区数据----------------"
hive-e"load data inpath '/user/flume/app-page/${etl_date}/*'into table project.ods_log_page_incr partition(pt='${etl_date}')"hive-e"load data inpath '/user/flume/app-actions/${etl_date}/*'into table project.ods_log_actions_incr partition(pt='${etl_date}')"hive-e"load data inpath '/user/flume/app-login/${etl_date}/*'into table project.ods_log_login_incr partition(pt='${etl_date}')"hive-e"load data inpath '/user/flume/app-exposure/${etl_date}/*'into table project.ods_log_exposure_incr partition(pt='${etl_date}')"esac
2、执行脚本
[peizk@hadoop job]$ sh load_hive.sh all 2022-01-01
------------开始加载全部日志表2022-01-01分区数据----------------
which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 0d0cd9d8-c692-49d0-856b-2b3b12d0969fLogging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true
Hive Session ID = 76e99fe1-f958-4264-b171-d7732719b551
Loading data to table project.ods_log_page_incr partition (pt=2022-01-01)
OK
Time taken: 1.962 seconds
which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 2e8f8319-c819-4d0e-85d6-44bcd913fee5Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true
Hive Session ID = 81624ba6-94cd-43ae-8938-1c9ac040bb70
Loading data to table project.ods_log_actions_incr partition (pt=2022-01-01)
OK
Time taken: 1.936 seconds
which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 898ac37e-0919-4d31-9870-574d5c5fdf00Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true
Hive Session ID = 43e6b9e3-4cb7-4590-a2e2-0e23731f28e8
Loading data to table project.ods_log_login_incr partition (pt=2022-01-01)
OK
Time taken: 1.952 seconds
which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 810102d2-3d20-4bcd-a0c2-34006c2b7ea2Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true
Hive Session ID = 533d6173-a204-49c1-a654-a6d3ccd23c66
Loading data to table project.ods_log_exposure_incr partition (pt=2022-01-01)
OK
Time taken: 1.927 seconds
3、查看 hive表中数据
文章图片
导入成功!!!
推荐阅读
- 大数据|14-HBase的介绍、数据模型以及架构模型
- 大数据|大数据(Flume和Sqoop)
- 网络|Kubernetes核心概念总结
- 深度学习|#今日论文推荐#思考总结10年,图灵奖得主Yann LeCun指明下一代AI方向(自主机器智能)
- 服务器|GBase 8a集群启停工具
- #|jpa、hibernate、spring-data-jpa关系
- 大数据|2021世界机器人大赛— 青少年机器人设计大赛
- 基于开源流批一体数据同步引擎ChunJun数据还原—DDL解析模块的实战分享
- JAVA|初步认识JAVA