ELK7.9 及Kafka 2.2.2 安装与配置
一、Kafka 2.2.2 1、server.properties 配置文件
broker.id=128
listeners=PLAINTEXT://cwbg001:9092
num.network.threads=3
num.io.threads=4
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=104857600
log.dirs=/home/kafka/kafka_2.12-2.2.2/data
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.32.128:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3000
二、Filebeat7.9.3 1、filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /home/elastic/elasticsearch-7.9.3/logs/elasticsearch_server.json
fields:
log_topic: 'prod-app-service-name-app-prod'
exclude_files: [".tar$",".tgz$",".gz$",".bz2$",".zip$"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
# output kafka
output.kafka:
hosts: ["192.168.32.128:9092"]
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: true
required_acks: 1
compression: gzip
max_message_bytes: 1000000
三、Logstash 7.9.1 1、logstash.conf
input {kafka {
bootstrap_servers => "192.168.32.128:9092"
group_id => "prod-app-consumergroup-793"
topics_pattern => "prod-app-service-name-app-prod"
codec => json
auto_offset_reset => "earliest"
consumer_threads => 3# number (optional), same with partition number, not the topic number, default: 1
decorate_events => true # boolean (optional), default: false
}
}filter { fingerprint {
target => "[@metadata][fingerprint]"
source => "[@metadata][kafka]"
key => "containerandcloudplatform"
method => "MD5"
concatenate_sources => true
} if [@metadata][kafka][topic] =~ "prod-app-*" {
grok { match => [ "[@metadata][kafka][topic]", "^prod-app-(?.*$)" ]}
#grok { match => [ "message", "(?【ELK7.9 及Kafka 2.2.2 安装与配置】^[^{]*?)(?{.*$)" ]}
mutate { add_field => {"json_segment" => "%{[message]}"}}mutate {
remove_field => [ "type", "source", "offset", "input_type", "plain_header","input", "@version","agent","log","fields"]
rename => { "index_name" => "[@metadata][es_index]" }
#rename => { "message" => "[@metadata][message]" }
rename => { "json_segment" => "[@metadata][json_segment]" }
}json {
source => "[@metadata][json_segment]"
}}if "_jsonparsefailure" in [tags] {
drop { }
}
}output {
stdout { codec => rubydebug { metadata => false }}
#elasticsearch {
#user => logstash
#password => "123456"
#hosts => ["192.168.32.128:9200"]
#index => "%{[@metadata][es_index]}-%{+YYYY.MM.dd}"
#document_id => "%{[@metadata][fingerprint]}"
##sniffing => true
#}
}
推荐阅读
- JS中的各种宽高度定义及其应用
- 参保人员因患病来不及到指定的医疗机构就医,能否报销医疗费用()
- MybatisPlus|MybatisPlus LambdaQueryWrapper使用int默认值的坑及解决
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- 经历了人生,才知道人生的艰难!及精彩!
- 罗塞塔石碑的意义(古埃及文字的起源,圣书体文字是如何被破解的)
- 以太坊中的计量单位及相互转换
- Spark|Spark 数据倾斜及其解决方案
- 2月读书感想及《战争风云》读后记
- 深入浅出谈一下有关分布式消息技术(Kafka)