Kafka Connector深度解读之JDBC源连接器
摘要
项目需要使用Kafka Stream通过加载mysql数据库里面的数据,然后做一个类似于ETL数据过滤功能。以此将导入到某一个主题的kafka数据跟通过kafka connect连接mysql的数据库里面的数据进行过滤去重。
内容
一.kafka安装
- kafka连接器功能是kafka1.0版本以上引入的,我们首先需要查看对应版本是否支持connect(直观的方式是:bin目录包含connect,conf目录包含connect);
文章图片
conf目录下:
文章图片
- 我们使用版本:kafka_2.11-1.0.1.jar. 其中2.11是Scala版本,1.0.1是Kafka版本;
去网站:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
下载;
文章图片
选择对应的版本
文章图片
解压得到以下目录结构:
文章图片
得到:
文章图片
将插件中lib里面的jar文件提取出来,放到kafka的libs目录:
文章图片
三.将java的MySQL驱动拷贝到到kafka的libs目录
文章图片
四:connect-mysql-source.properties配置文件
将kafka-connect-jdbc中etc目录下文件复制到kafka的config目录下,并修改为connect-mysql-source.properties;
文章图片
拷贝到kafka的config下:
文章图片
根据本地数据源修改配置:
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
#connection.url=jdbc:mysql://192.168.101.3:3306/databasename?user=xxx&password=xxx
connection.url=jdbc:mysql://127.0.01:3306/us_app?user=root&password=root
table.whitelist=ocm_blacklist_number
#bulk为批量导入,此外还有incrementing和imestamp模式
mode=bulk
#timestamp.column.name=time
#incrementing.column.name=id
topic.prefix=connect-mysql-
配置说明参考:https://www.jianshu.com/p/9b1dd28e92f0
五.修改kafka目录下config/connect-standalone.properties.
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=falseoffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
六.启动kafka connect
bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql-source.properties
文章图片
注意:connect-standalone.sh为单节点模式,此外还有connect-distributed集群模式,使用集群模式则需修改connect-distributed.properties
七.消费kafka,查看是否导入成功
【Kafka Connector深度解读之JDBC源连接器】你可以启动一个消费者,从起始点开始消费connect-mysql-ocm_blacklist_number这个主题,如果能看到输出说明你的连接器配置成功了。
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic connect-mysql-ocm_blacklist_number --from-begin
文章图片
参考: https://blog.csdn.net/u014686399/article/details/84962377
推荐阅读
- 《深度倾听》第5天──「RIA学习力」便签输出第16期
- 深入浅出谈一下有关分布式消息技术(Kafka)
- 15.Kafka
- 深度解读(《秘密》(八)—健康的秘密)
- 影响深度思考的9种思维定式
- 为Google|为Google Cloud配置深度学习环境(CUDA、cuDNN、Tensorflow2、VScode远程ssh等)
- 深度学习-入门
- 养好一塘鱼——《深度思维》(三)
- OpenCV|OpenCV-Python实战(18)——深度学习简介与入门示例
- 讲给资深产品人跳槽用的21道深度好问题