用 logstash 从 kafka 读取数据写入 Elasticsearch(qbit)
技术栈
OS: Ubuntu 20.04 LTS
docker: 20.10.12
docker-compose: 1.25.0
Elasticsearch: 7.16.3
Logstash: 7.16.3
kafka: 2.13-2.8.1
Python: 3.8.2
kafka-python: 2.0.2
用 docker 搭建 logstash 官方文档
- docker 镜像拉取:https://www.elastic.co/guide/...
- docker 镜像配置:https://www.elastic.co/guide/...
- docker 镜像目录结构:https://www.elastic.co/guide/...
- 拉取镜像
docker pull docker.elastic.co/logstash/logstash:7.16.3
- logstash 配置文件
/home/qbit/logstash/settings/logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://192.168.1.46:9200" ]
- 管道配置文件
/home/qbit/logstash/pipeline/:/usr/share/logstash/pipeline/es-pipeline.conf
input {
kafka {
codec => json
bootstrap_servers => "192.168.1.46:9092"
topics => ["coder_topic"]
}
}filter {
mutate {
add_field => { "timestamp" => "%{@timestamp}" }
remove_field => ["@version"]
}
date {
match => [ "timestamp", "ISO8601" ]# 这里用 @timestamp 解析会出错
target => "time0"
}
ruby {
code => "
time1 = event.get('@timestamp').time.getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08')
time2 = Time.parse(event.get('timestamp')).getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08')
time3 = Time.now.getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08')
event.set('time1', time1)
event.set('time2', time2)
event.set('time3', time3)
"
}
}output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => ["192.168.1.46:9200"]
index => "coder_index"
document_id => "%{id}"
}
}
- 创建容器
docker run --rm -it --name logstash \
-v /home/qbit/logstash/pipeline/:/usr/share/logstash/pipeline/ \
-v /home/qbit/logstash/settings/logstash.yml:/usr/share/logstash/config/logstash.yml \
docker.elastic.co/logstash/logstash:7.16.3
用 Python 发送消息
producer.py
# encoding: utf-8
# author: qbit
# date: 2022-01-28
# summary: 向 kafka 发送消息import json
from kafka import KafkaProducerdef producer():
producer = KafkaProducer(
bootstrap_servers="192.168.1.46:9092",
key_serializer=lambda k: json.dumps(k).encode('utf8'),
value_serializer=lambda v: json.dumps(v).encode('utf8'),
)
id = 'qbit'
dic = {'id': f"{id}", 'age': '23'}
producer.send(topic="coder_topic", key=id, value=https://www.it610.com/article/dic)
print(f"send key: {id}, value: {dic}")if __name__ == "__main__":
producer()
- 运行结果
# python3 producer.py
send key: qbit, value: {'id': 'qbit', 'age': '23'}
用 Kibana 查看 ES 中数据
GET coder_index/_search
{
"_index": "coder_index",
"_type": "_doc",
"_id": "qbit",
"_score": 1.0,
"_source": {
"id": "qbit",
"age": "23",
"@timestamp": "2022-01-28T01:03:40.733Z",// logstash event 时间戳
"timestamp":"2022-01-28T01:03:40.733Z",
"time0":"2022-01-28T01:03:40.733Z",
"time1":"2022-01-28T09:03:40+08",
"time2":"2022-01-28T09:03:40+08",
"time3":"2022-01-28T09:03:40+08"// filter 中 ruby 代码生成的时间戳
}
}
本文出自 qbit snap
推荐阅读
- Docker应用:容器间通信与Mariadb数据库主从复制
- 一个人的碎碎念
- JS中的各种宽高度定义及其应用
- 我从来不做坏事
- 由浅入深理解AOP
- 【译】20个更有效地使用谷歌搜索的技巧
- 涉毒患者(新诗)
- 参保人员因患病来不及到指定的医疗机构就医,能否报销医疗费用()
- mybatisplus如何在xml的连表查询中使用queryWrapper
- MybatisPlus|MybatisPlus LambdaQueryWrapper使用int默认值的坑及解决