用 logstash 从 kafka 读取数据写入 Elasticsearch(qbit)

技术栈

OS: Ubuntu 20.04 LTS docker: 20.10.12 docker-compose: 1.25.0 Elasticsearch: 7.16.3 Logstash: 7.16.3 kafka: 2.13-2.8.1 Python: 3.8.2 kafka-python: 2.0.2

用 docker 搭建 logstash 官方文档
  • docker 镜像拉取:https://www.elastic.co/guide/...
  • docker 镜像配置:https://www.elastic.co/guide/...
  • docker 镜像目录结构:https://www.elastic.co/guide/...
配置步骤
  • 拉取镜像
docker pull docker.elastic.co/logstash/logstash:7.16.3

  • logstash 配置文件 /home/qbit/logstash/settings/logstash.yml
http.host: "0.0.0.0" xpack.monitoring.elasticsearch.hosts: [ "http://192.168.1.46:9200" ]

  • 管道配置文件 /home/qbit/logstash/pipeline/:/usr/share/logstash/pipeline/es-pipeline.conf
input { kafka { codec => json bootstrap_servers => "192.168.1.46:9092" topics => ["coder_topic"] } }filter { mutate { add_field => { "timestamp" => "%{@timestamp}" } remove_field => ["@version"] } date { match => [ "timestamp", "ISO8601" ]# 这里用 @timestamp 解析会出错 target => "time0" } ruby { code => " time1 = event.get('@timestamp').time.getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08') time2 = Time.parse(event.get('timestamp')).getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08') time3 = Time.now.getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08') event.set('time1', time1) event.set('time2', time2) event.set('time3', time3) " } }output { stdout { codec => json_lines } elasticsearch { hosts => ["192.168.1.46:9200"] index => "coder_index" document_id => "%{id}" } }

  • 创建容器
docker run --rm -it --name logstash \ -v /home/qbit/logstash/pipeline/:/usr/share/logstash/pipeline/ \ -v /home/qbit/logstash/settings/logstash.yml:/usr/share/logstash/config/logstash.yml \ docker.elastic.co/logstash/logstash:7.16.3

用 Python 发送消息
  • producer.py
# encoding: utf-8 # author: qbit # date: 2022-01-28 # summary: 向 kafka 发送消息import json from kafka import KafkaProducerdef producer(): producer = KafkaProducer( bootstrap_servers="192.168.1.46:9092", key_serializer=lambda k: json.dumps(k).encode('utf8'), value_serializer=lambda v: json.dumps(v).encode('utf8'), ) id = 'qbit' dic = {'id': f"{id}", 'age': '23'} producer.send(topic="coder_topic", key=id, value=https://www.it610.com/article/dic) print(f"send key: {id}, value: {dic}")if __name__ == "__main__": producer()

  • 运行结果
# python3 producer.py send key: qbit, value: {'id': 'qbit', 'age': '23'}

用 Kibana 查看 ES 中数据
GET coder_index/_search

{ "_index": "coder_index", "_type": "_doc", "_id": "qbit", "_score": 1.0, "_source": { "id": "qbit", "age": "23", "@timestamp": "2022-01-28T01:03:40.733Z",// logstash event 时间戳 "timestamp":"2022-01-28T01:03:40.733Z", "time0":"2022-01-28T01:03:40.733Z", "time1":"2022-01-28T09:03:40+08", "time2":"2022-01-28T09:03:40+08", "time3":"2022-01-28T09:03:40+08"// filter 中 ruby 代码生成的时间戳 } }

本文出自 qbit snap

    推荐阅读