ELK7.9 及Kafka 2.2.2 安装与配置

一、Kafka 2.2.2 1、server.properties 配置文件

broker.id=128 listeners=PLAINTEXT://cwbg001:9092 num.network.threads=3 num.io.threads=4 socket.send.buffer.bytes=1024000 socket.receive.buffer.bytes=1024000 socket.request.max.bytes=104857600 log.dirs=/home/kafka/kafka_2.12-2.2.2/data num.partitions=3 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.32.128:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=3000

二、Filebeat7.9.3 1、filebeat.yml
filebeat.inputs: - type: log enabled: true paths: - /home/elastic/elasticsearch-7.9.3/logs/elasticsearch_server.json fields: log_topic: 'prod-app-service-name-app-prod' exclude_files: [".tar$",".tgz$",".gz$",".bz2$",".zip$"] filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false # output kafka output.kafka: hosts: ["192.168.32.128:9092"] topic: '%{[fields.log_topic]}' partition.round_robin: reachable_only: true required_acks: 1 compression: gzip max_message_bytes: 1000000

三、Logstash 7.9.1 1、logstash.conf
input {kafka { bootstrap_servers => "192.168.32.128:9092" group_id => "prod-app-consumergroup-793" topics_pattern => "prod-app-service-name-app-prod" codec => json auto_offset_reset => "earliest" consumer_threads => 3# number (optional), same with partition number, not the topic number, default: 1 decorate_events => true # boolean (optional), default: false } }filter { fingerprint { target => "[@metadata][fingerprint]" source => "[@metadata][kafka]" key => "containerandcloudplatform" method => "MD5" concatenate_sources => true } if [@metadata][kafka][topic] =~ "prod-app-*" { grok { match => [ "[@metadata][kafka][topic]", "^prod-app-(?.*$)" ]} #grok { match => [ "message", "(?【ELK7.9 及Kafka 2.2.2 安装与配置】^[^{]*?)(?{.*$)" ]} mutate { add_field => {"json_segment" => "%{[message]}"}}mutate { remove_field => [ "type", "source", "offset", "input_type", "plain_header","input", "@version","agent","log","fields"] rename => { "index_name" => "[@metadata][es_index]" } #rename => { "message" => "[@metadata][message]" } rename => { "json_segment" => "[@metadata][json_segment]" } }json { source => "[@metadata][json_segment]" }}if "_jsonparsefailure" in [tags] { drop { } } }output { stdout { codec => rubydebug { metadata => false }} #elasticsearch { #user => logstash #password => "123456" #hosts => ["192.168.32.128:9200"] #index => "%{[@metadata][es_index]}-%{+YYYY.MM.dd}" #document_id => "%{[@metadata][fingerprint]}" ##sniffing => true #} }

    推荐阅读