基于flume-ng抓取mysql数据到kafka

flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/

在apache flume的官网上没有找到sql数据源数据抓取的source,
可以利用github上的plugin插件:https://github.com/keedio/flume-ng-sql-source,1.4.3版本基于hibernate实现,已可以适配所有的关系型数据库。
目前的实验环境是在windows下,所以kafka在windows下相关的配置使用,参考了http://blog.csdn.net/linsongbin1/article/details/48022941
文章写的很到位,推荐给大家。


下面主要说一下flume-ng在windows下的启动,及具体的配置
启动:
“F:\Java\jdk1.8.0_101\bin\java.exe” -Xmx512m -Dlog4j.configuration=file:///E:\apache-flume-1.6.0-bin\conf\log4j.properties -cp "E:\apache-flume-1.6.0-bin\lib\*; E:\apache-flume-1.6.0-bin\plugins.d\sql-source\lib\flume-ng-sql-source-1.4.3-SNAPSHOT.jar; E:\apache-flume-1.6.0-bin\plugins.d\sql-source\libext\mysql-connector-java-5.1.35-bin.jar" org.apache.flume.node.Application -f E:\apache-flume-1.6.0-bin\conf\sql-kafka-conf.properties -n a1

主要的sql-kafka-conf.properties
a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://127.0.0.1/test
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = password
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path = E://apache-flume-1.6.0-bin
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select id,name from test_user where id > $@$ order by id asc
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
##############################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hellotest
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1

【基于flume-ng抓取mysql数据到kafka】红色部分是需要注意的,有s

至此就完成了利用flume将mysql数据实时导入到kafka中

    推荐阅读