使用log4j将数据流入flume

最近做了一个log抽取的项目,采用log4j+flume实现,在此分享记录一下。
准备 什么是flume?
flume是一个提供高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
flume提供了source、channel、sink三个组件,实现数据的抽取加载。一组source、channel、sink组成一个agent同步数据,可以通过并联、串联agent的方式来灵活的实现数据抽取。
更多flume的文章可参考:Flume系列文章
log4j+flume
log4j和flume整合,官方提供了两种appender将log4j的日志写入flume,分别是Log4J AppenderLoad Balancing Log4J Appender
Log4J Appender Log4J Appender将log数据发送到flume的一个avro source中,在flume中可以根据需求在下游接不同的sink。
【使用log4j将数据流入flume】Log4j Appender使用时,有以下的配置参数(加粗的是必须的):

参数名 默认值 描述
Hostname source的host地址,如:110.110.110.100
Port source的监听端口,如:9999
UnsafeMode false 如果为true,则添加程序不会在发送事件失败时引发异常。
AvroReflectionEnabled false 使用Avro Reflection序列化Log4j事件。
AvroSchemaUrl avro schema的url地址
Load Balancing Log4J Appender 将log数据发送到flume的多个avro source中。实现负载均衡。
使用时,有以下的配置参数(加粗的是必须的):
参数名 默认值 描述
Hosts sources的host:port。是以空格分隔的。如:10.10.10.10:9999 10.10.10.11:9999
Selector ROUND_ROBIN 选择机制。必须为ROUND_ROBIN,RANDOM或自定义FQDN。
MaxBackoff 表示负载均衡客户端将从未能消耗事件的节点退出的最长时间(以毫秒为单位)。
UnsafeMode false 如果为true,则添加程序不会在发送事件失败时引发异常。
AvroReflectionEnabled false 使用Avro Reflection序列化Log4j事件。.
AvroSchemaUrl avro schema的url地址

Load Balancing Log4J Appender相当于是实现了多个Log4J Appender来实现负载均衡。在flume端,
Load Balancing Log4J Appender需要配置多个avro source来监听输入。
具体实现 pom依赖
org.apache.flume.flume-ng-clients flume-ng-log4jappender 1.9.0 org.apache.flume flume-ng-sdk 1.9.0 log4j log4j 1.2.17 org.slf4j slf4j-log4j12 1.7.25 test

测试类
package com.upupfeng; import org.apache.log4j.Logger; public class Log4j2Flume { public static void main(String[] args) { Logger logger = Logger.getLogger(Log4j2Flume.class); logger.info("test"); } }

log4j.properties
log4j.rootLogger=debug,stdout,flume# 输出到控制台 log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern =[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5p %l - %m%n# Log4j Appender log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname=192.168.168.200 log4j.appender.flume.Port=41414 log4j.appender.flume.UnsafeMode=true log4j.appender.flume.layout=org.apache.log4j.PatternLayout log4j.appender.flume.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5p %l - %m%n# Load Balancing Log4J Appender log4j.appender.flume2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.flume2.Hosts = 192.168.168.200:9001 192.168.168.200:9002 192.168.168.200:9003 log4j.appender.flume2.Selector = ROUND_ROBIN log4j.appender.flume2.MaxBackoff = 30000 log4j.appender.flume2.UnsafeMode = true log4j.appender.flume2.Threshold=ERROR log4j.appender.flume2.layout=org.apache.log4j.PatternLayout log4j.appender.flume2.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5p %l - %m%n

flume-conf.properties agent的配置。Log4J Appender只需要配置一个agent;Load Balancing Log4J Appender要配置多个。
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414# Describe the sink a1.sinks.k1.type = logger# Use a channel which buffers events in memory a1.channels.c1.type = memory ## 事件容量 a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1


对于Log4j Appender的方式,启动一个agent等待接收,运行代码即可在flume的sink端获得数据。
对于Load Balancing Log4J Appender的方式,启动多个agent等待接收,进行负载均衡的接收数据。
参考 http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#log4j-appender

    推荐阅读