Kafka Connector深度解读之JDBC源连接器

摘要 项目需要使用Kafka Stream通过加载mysql数据库里面的数据,然后做一个类似于ETL数据过滤功能。以此将导入到某一个主题的kafka数据跟通过kafka connect连接mysql的数据库里面的数据进行过滤去重。
内容 一.kafka安装

  • kafka连接器功能是kafka1.0版本以上引入的,我们首先需要查看对应版本是否支持connect(直观的方式是:bin目录包含connect,conf目录包含connect);
bin目录下:
Kafka Connector深度解读之JDBC源连接器
文章图片

conf目录下:
Kafka Connector深度解读之JDBC源连接器
文章图片

  • 我们使用版本:kafka_2.11-1.0.1.jar. 其中2.11是Scala版本,1.0.1是Kafka版本;
二.下载kafka-connect-jdbc插件
去网站:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
下载;
Kafka Connector深度解读之JDBC源连接器
文章图片

选择对应的版本
Kafka Connector深度解读之JDBC源连接器
文章图片

解压得到以下目录结构:
Kafka Connector深度解读之JDBC源连接器
文章图片

得到:
Kafka Connector深度解读之JDBC源连接器
文章图片

将插件中lib里面的jar文件提取出来,放到kafka的libs目录:
Kafka Connector深度解读之JDBC源连接器
文章图片

三.将java的MySQL驱动拷贝到到kafka的libs目录
Kafka Connector深度解读之JDBC源连接器
文章图片

四:connect-mysql-source.properties配置文件
将kafka-connect-jdbc中etc目录下文件复制到kafka的config目录下,并修改为connect-mysql-source.properties;
Kafka Connector深度解读之JDBC源连接器
文章图片

拷贝到kafka的config下:
Kafka Connector深度解读之JDBC源连接器
文章图片

根据本地数据源修改配置:
# A simple example that copies all tables from a SQLite database. The first few settings are # required for all connectors: a name, the connector class to run, and the maximum number of # tasks to create: name=test-source-mysql-jdbc-autoincrement connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 # The remaining configs are specific to the JDBC source connector. In this example, we connect to a # SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to # detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g. # a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'. #connection.url=jdbc:mysql://192.168.101.3:3306/databasename?user=xxx&password=xxx connection.url=jdbc:mysql://127.0.01:3306/us_app?user=root&password=root table.whitelist=ocm_blacklist_number #bulk为批量导入,此外还有incrementing和imestamp模式 mode=bulk #timestamp.column.name=time #incrementing.column.name=id topic.prefix=connect-mysql-

配置说明参考:https://www.jianshu.com/p/9b1dd28e92f0
五.修改kafka目录下config/connect-standalone.properties.
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=falseoffset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000

六.启动kafka connect
bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql-source.properties

Kafka Connector深度解读之JDBC源连接器
文章图片

注意:connect-standalone.sh为单节点模式,此外还有connect-distributed集群模式,使用集群模式则需修改connect-distributed.properties
七.消费kafka,查看是否导入成功
【Kafka Connector深度解读之JDBC源连接器】你可以启动一个消费者,从起始点开始消费connect-mysql-ocm_blacklist_number这个主题,如果能看到输出说明你的连接器配置成功了。
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic connect-mysql-ocm_blacklist_number --from-begin

Kafka Connector深度解读之JDBC源连接器
文章图片

参考: https://blog.csdn.net/u014686399/article/details/84962377

    推荐阅读