《Apache|《Apache Kafka 实战》笔记 - 10.2.2 Connect - 单机

Kafka connect Kafka connector 负责将数据从外部系统转移到Kafka或从Kafka中转移到其他系统,比如Kafka Connect能够将文件系统中某些文件的内容全部灌入Kafka topic中或者是把Kafka topic中的消息导出到外部的数据库系统。
Kafka Connect 主要由 source connector 和 sink connector 组成,source connector 负责把输入数据从外部系统中导入到Kafka中,而 sink connector 则负责把输出数据导出到其他外部系统。
目标 下面在一个单节点的Kafka集群上运行 standalone 模式的 Kafka Connect,把输入文件 foo.txt 中的数据通过Kafka传输到输出文件 bar.txt 中。
创建配置文件 test-connect-file-source.properties

name=test-file-source connector.class=FileStreamSource tasks.max=1 file=foo.txt topic=connect-file-test

test-connect-file-sink.properties
name= test-file-sink connector.class=FileStreamSink tasks.max=1 file=bar.txt topics=connect-file-test

启动 connect
bin/connect-standalone.sh config/connect-standalone.properties config/test-connect-file-source.properties config/test-connect-file-sink.properties

启动后会一直提示警告信息:
WARN Couldn't find file foo.txt for ...

这是因为源文件还不存在,向源文件中添加内容:
echo 'hello' >> ./foo.txt echo 'kafka connect test example' >> ./foo.txt echo 'this is a file connector test.' >> ./foo.txt

查看下目录,已经出现了 bar.txt,内容就是上面输入的。
使用 consumer 读取 topic 验证一下:
bin/kafka-console-consumer.sh--bootstrap-server localhost:9092 --topic connect-file-test --from-beginning

导出前修改消息数据 下面实验在将数据导出到目标文件之前为每条消息增加一个IP字段。
【《Apache|《Apache Kafka 实战》笔记 - 10.2.2 Connect - 单机】如果要插入IP静态字段,我们必须修改source connector的配置文件,增加以下这些行:
transforms=WrapMap,InsertHost transforms.WrapMap.type=org.apache.kafka.connect.transforms.HoistField$Value transforms.WrapMap.field=line transforms.InsertHost.type=org.apache.kafka.connect.transforms.InsertField$Value transforms.InsertHost.static.field=ip transforms.InsertHost.static.value=https://www.it610.com/article/com.connector.machine1

之后重启Kafka Connect,然后写入foo.txt文件:
echo "this is a transformation test" >> ./foo.txt

查看bar.txt可以发现这条新增的数据:
Struct{line=this is a transformation test,ip=com.connector.machine1}

    推荐阅读