ELK+kafka+filebeat搭建生产ELFK集群 --wukong编辑器

非淡泊无以明志,非宁静无以致远。这篇文章主要讲述ELK+kafka+filebeat搭建生产ELFK集群 --wukong编辑器相关的知识,希望能为你提供帮助。

  • - -
作者:[SRE运维博客](https://www.cnsre.cn/ )
博客地址: [https://www.cnsre.cn/](https://www.cnsre.cn/ )
文章地址:??https://www.cnsre.cn/posts/210325135520/??
相关话题:??https://www.cnsre.cn/tags/elk/??
  • - -
ELK+kafka+filebeat搭建生产ELFK集群ELK 架构介绍
集群服务版本
服务|版本-|-java|1.8.0_221elasticsearch|7.10.1filebeat|7.10.1kibana|7.10.1logstash|7.10.1cerebro|0.9.2-1kafka|2.12-2.3.0zookeeper|3.5.6
服务器环境说明
IP地址|主机名|配置|角色-|-|-|-
  1. 0.11.172|elk-master|4C16G|es-master、kafka+zookeeper1
  2. 0.21.117|elk-node1|4C16G|es-node1、kafka+zookeeper2
  3. 0.11.208|elk-node2|4C16G|es-node2、kafka+zookeeper3
  4. 0.10.242|elk-kibana|4C16G|logstash、kibana、cerebro
系统参数优化{{< notice warning "注意" > }}三个节点都需要执行{{< /notice > }}
修改主机名
shell

hostnamectl set-hostname elk-master

hostnamectl set-hostname elk-node1

hostnamectl set-hostname elk-node2

增加文件描述符
shell

cat > > /etc/security/limits.conf< < EOF

*softnofile65536

*hardnofile65536

*softnproc65536

*hardnproc65536

*hardmemlockunlimited

*softmemlockunlimited

EOF


修改默认限制内存
shell

cat > > /etc/systemd/system.conf< < EOF

DefaultLimitNOFILE=65536

DefaultLimitNPROC=32000

DefaultLimitMEMLOCK=infinity

EOF


优化内核,对es支持
cat > > /etc/sysctl.conf< < EOF

# 关闭交换内存

vm.swappiness =0

# 影响java线程数量,建议修改为262144或者更高

vm.max_map_count= 262144

# 优化内核listen连接

net.core.somaxconn=65535

# 最大打开文件描述符数,建议修改为655360或者更高

fs.file-max=655360

# 开启ipv4转发

net.ipv4.ip_forward= 1

EOF


修改Hostname配置文件
shell

cat > > /etc/hosts< < EOF

elk-master10.0.11.172

elk-node110.0.21.117

elk-node210.0.11.208

EOF


重启使配置生效
shell

reboot

部署Zookeeper{{< notice warning "注意" > }}三个节点都需要执行{{< /notice > }}
创建Zookeeper项目目录
shell

#存放快照日志

mkdir zkdata
#存放事物日志

mkdir zklogs

下载解压zookeeper
shell

wgethttp://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz

tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz
mv apache-zookeeper-3.5.6-bin zookeeper

修改配置文件
shell

[root@elk-master zookeeper]# cat conf/zoo.cfg|grep-v ^#

# 服务器之间或客户端与服务器之间维持心跳的时间间隔

# tickTime以毫秒为单位。

tickTime=2000
# 集群中的follower服务器(F)与leader服务器(L)之间的初始连接心跳数

initLimit=10

# 集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数

syncLimit=5

# 数据保存目录

dataDir=../zkdata

# 日志保存目录

dataLogDir=../zklogs

# 客户端连接端口

clientPort=2181

# 客户端最大连接数。# 根据自己实际情况设置,默认为60个

maxClientCnxns=60

# 三个接点配置,格式为: server.服务编号=服务地址、LF通信端口、选举端口

server.1=10.0.11.172:2888:3888

server.2=10.0.21.117:2888:3888

server.3=10.0.11.208:2888:3888

写入节点标记
{{< notice warning "注意" > }}分别在三个节点??/home/tools/zookeeper/zkdata/myid??写入节点标记{{< /notice > }}{{< tabs master节点 node1节点 node2节点 > }}{{< tab > }}
master的操作
shell

echo "1" > /home/tools/zookeeper/zkdata/myid

{{< /tab > }}{{< tab > }}
node1的操作
shell

echo "2" > /home/tools/zookeeper/zkdata/myid

{{< /tab > }}{{< tab > }}
node2的操作
shell

echo "3" > /home/tools/zookeeper/zkdata/myid

{{< /tab > }}{{< /tabs > }}
启动zookeeper集群
shell

[root@elk-master zookeeper]# cd /home/tools/zookeeper/bin/

[root@elk-master bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default

Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

检查集群状态
[root@elk-master bin]# sh /home/tools/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default

Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg

Client port found: 2181. Client address: localhost.

Mode: leader


设置全局变量
shell

cat > > /etc/profile< < EOF

export ZOOKEEPER_INSTALL=/home/tools/zookeeper/

export PATH=$PATH:$ZOOKEEPER_INSTALL/bin

export PATH

EOF

  • 使配置生效
shell

source /etc/profile

这样就可以全局使用??zkServer.sh??命令了
部署 Kafka{{< notice warning "注意" > }}三个节点都需要执行{{< /notice > }}
下载解压kafka压缩包
[root@elk-master tools]# mkdir kafka

[root@elk-master tools]# cd kafka/

[root@elk-master kafka]# wget https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz

[root@elk-master kafka]# tar xf kafka_2.12-2.3.0.tgz
[root@elk-master kafka]# mv kafka_2.12-2.3.0 kafka

[root@elk-master kafka]# cd kafka/config/

配置kafka
shell
[root@elk-master config]# cat /home/tools/kafka/kafka/config/server.properties

############################# Server Basics #############################
# broker的id,值为整数,且必须唯一,在一个集群中不能重复

broker.id=1



############################# Socket Server Se:ttings #############################
# kafka默认监听的端口为9092 (默认与主机名进行连接)

listeners=PLAINTEXT://:9092



# 处理网络请求的线程数量,默认为3个

num.network.threads=3



# 执行磁盘IO操作的线程数量,默认为8个
num.io.threads=8



# socket服务发送数据的缓冲区大小,默认100KB

socket.send.buffer.bytes=102400



# socket服务接受数据的缓冲区大小,默认100KB

socket.receive.buffer.bytes=102400



# socket服务所能接受的一个请求的最大大小,默认为100M

socket.request.max.bytes=104857600



############################# Log Basics #############################
# kafka存储消息数据的目录

log.dirs=../kfkdata



# 每个topic默认的partition数量

num.partitions=3



# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量

num.recovery.threads.per.data.dir=1



############################# Log Flush Policy #############################


# 消息刷新到磁盘中的消息条数阈值

#log.flush.interval.messages=10000



# 消息刷新到磁盘中的最大时间间隔,1s

#log.flush.interval.ms=1000



############################# Log Retention Policy #############################


# 日志保留小时数,超时会自动删除,默认为7天

log.retention.hours=168



# 日志保留大小,超出大小会自动删除,默认为1G

#log.retention.bytes=1073741824



# 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件

log.segment.bytes=1073741824



# 每隔多长时间检测数据是否达到删除条件,300s

log.retention.check.interval.ms=300000



############################# Zookeeper #############################
# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开

zookeeper.connect=10.0.11.172,10.0.21.117,10.0.11.208



# 连接zookeeper的超时时间,6s

zookeeper.connection.timeout.ms=6000

创建数据存储的目录
shell

[root@elk-master config]# mkdir ../kfkdata

修改broker.id
{{< notice warning "注意" > }}分别在三个节点依次修改??/home/tools/kafka/kafka/config/server.properties??配置文件{{< /notice > }}{{< tabs master节点 node1节点 node2节点 > }}{{< tab > }}
master的配置
shell

broker.id=1

{{< /tab > }}{{< tab > }}
node1的配置
shell

broker.id=2

{{< /tab > }}{{< tab > }}
node2的配置
shell

broker.id=3

{{< /tab > }}{{< /tabs > }}

启动kafka集群
shell

cd /home/tools/kafka/kafka/bin/

#启动测试

./kafka-server-start.sh ../config/server.properties

#放入后台

./kafka-server-start.sh -daemon ../config/server.properties

测试
{{< notice warning "注意" > }}任意节点均可执行{{< /notice > }}在创建topic在集群中的任意节点 发布消息订阅消息验证结果

{{< tabs 创建topic 消息发布 topic消息订阅 > }}{{< tab > }}
shell

[root@elk-master bin]# ./kafka-topics.sh \\

--create \\

--zookeeper 10.0.11.172:2181,10.0.21.117:2181,10.0.11.208:2181 \\

--partitions 3 \\

--replication-factor 1 \\

--topic logs

{{< /tab > }}{{< tab > }}
shell

[root@elk-master bin]# ./kafka-console-producer.sh \\

--broker-list 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \\

--topic logs

{{< /tab > }}{{< tab > }}
shell

[root@elk-master bin]#./kafka-console-consumer.sh \\

--bootstrap-server 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \\

--topic logs \\

--from-beginning

{{< /tab > }}{{< /tabs > }}

部署elasticsearch{{< notice warning "注意" > }}三个节点都需要执行{{< /notice > }}
下载安装elasticsearch
shell

wget https://pan.cnsre.cn/d/Package/Linux/ELK/elasticsearch-7.10.1-x86_64.rpm

[root@elk-master package]#rpm -ivh elasticsearch-7.10.1-x86_64.rpm

备份配置文件
shell

cd /etc/elasticsearch

cp elasticsearch.ymlelasticsearch.yml.bak

修改配置文件
shell

cat > /etc/elasticsearch/elasticsearch.yml < < EOF

#集群名

cluster.name: elk-cluster



#node名

node.name: elk-1



#数据存储路径

path.data: /home/elasticsearch/esdata



#数据快照路径

path.repo: /home/backup/essnapshot



#日志存储路径

path.logs: /home/elasticsearch/eslogs



#es绑定的ip地址,根据自己机器ip进行修改

network.host: 0.0.0.0



#服务端口

http.port: 9200



#集群master需要和node名设置一致

discovery.seed_hosts: ["10.0.11.172", "10.0.21.117", "10.0.11.208"]

cluster.initial_master_nodes: ["10.0.11.172","10.0.21.117","10.0.11.208"]



#允许跨域请求

http.cors.enabled: true

#* 表示支持所有域名

http.cors.allow-origin: "*"

#添加请求header

http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type



#生产必须为true,内存锁定检查,目的是内存地址直接映射,减少一次copy时间

bootstrap.memory_lock: true

#系统过滤检查,防止数据损坏,考虑集群安全,生产设置成false

bootstrap.system_call_filter: false



#xpack配置

xpack.security.enabled: true

xpack.security.transport.ssl.enabled: true

xpack.security.transport.ssl.verification_mode: certificate

xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/elastic-certificates.p12

xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/elastic-certificates.p12

EOF

修改JVM
  • 将??jvm.options???文件中22-23行的??8g??设置为你的服务内存的一半
shell

[root@elk-node1 elasticsearch]# cat -njvm.options |grep 8g

22-Xms8g

23-Xmx8g

修改其他节点配置
{{< notice warning "注意" > }}分别在三个节点修改??/etc/elasticsearch/elasticsearch.yml??配置文件{{< /notice > }}{{< tabs master节点 node1节点 node2节点 > }}{{< tab > }}
master的配置
shell

node.name: "es-master"

{{< /tab > }}{{< tab > }}
node1的配置
shell

node.name: "es-node1"

{{< /tab > }}{{< tab > }}
node2的配置
shell

node.name: "es-node2"

{{< /tab > }}{{< /tabs > }}最终展示

分配权限
因为自定义数据、日志存储目录,所以要把权限给到目录
mkdir-p /home/elasticsearch/{esdata,eslogs}

chownelasticsearch:elasticsearch/home/elasticsearch/*

mkdir-p /home/backup/essnapshot

chown elasticsearch:elasticsearch /home/backup/essnapshot

启动服务
三个节点全部启动并加入开机启动
systemctl start elasticsearch

systemctl enableelasticsearch

使用xpack进行安全认证
xpack的安全功能
  • TLS 功能。 可对通信进行加密
  • 文件和原生 Realm。 可用于创建和管理用户
  • 基于角色的访问控制。 可用于控制用户对集群 API 和索引的访问权限
  • 通过针对 Kibana Spaces 的安全功能,还可允许在Kibana 中实现多租户。
在我配置过程中,发现集群认证需要首先配置秘钥才行,否则在给内置用户创建秘钥的时候将会报错。{{< notice warning "error" > }}Cause: Cluster state has not been recovered yet, cannot write to the [null] index{{< /notice > }}
shell

Unexpected response code [503] from calling PUT http://10.0.11.172:9200/_security/user/apm_system/_password?pretty

Cause: Cluster state has not been recovered yet, cannot write to the [null] index



Possible next steps:

* Try running this tool again.

* Try running with the --verbose parameter for additional messages.

* Check the elasticsearch logs for additional error details.

* Use the change password API manually.


ERROR: Failed to set password for user [apm_system].

申请证书{{< notice warning "注意" > }}下面的操作,在其中一个节点操作即可{{< /notice > }}
shell

/usr/share/elasticsearch/bin/elasticsearch-certutil ca

/usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12

两条命令均一路回车即可,不需要给秘钥再添加密码证书创建完成之后,默认在es的数据目录。将证书拷贝到etc下,并给上权限。
[root@elk-master ~]# ls /usr/share/elasticsearch/elastic-*

/usr/share/elasticsearch/elastic-certificates.p12

/usr/share/elasticsearch/elastic-stack-ca.p12

cp /usr/share/elasticsearch/elastic-* /etc/elasticsearch/

chown elasticsearch.elasticsearch /etc/elasticsearch/elastic*

做完之后,将证书拷贝到其他节点
为内置账号添加密码ES中内置了几个管理其他集成组件的账号??apm_system, beats_system, elastic, kibana, logstash_system, remote_monitoring_user??使用之前,首先需要设置下密码。
shell

/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive

Initiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.

You will be prompted to enter passwords as the process progresses.

Please confirm that you would like to continue [y/N]y

Enter password for [elastic]:

Reenter password for [elastic]:

Enter password for [apm_system]:

Reenter password for [apm_system]:

Enter password for [kibana]:

Reenter password for [kibana]:

Enter password for [logstash_system]:

Reenter password for [logstash_system]:

Enter password for [beats_system]:

Reenter password for [beats_system]:

Enter password for [remote_monitoring_user]:

Reenter password for [remote_monitoring_user]:

Changed password for user [apm_system]

Changed password for user [kibana]

Changed password for user [logstash_system]

Changed password for user [beats_system]

Changed password for user [remote_monitoring_user]

Changed password for user [elastic]

部署 Cerebro下载安装
shell

wget https://pan.cnsre.cn/d/Package/Linux/ELK/cerebro-0.9.2-1.noarch.rpm

rpm -ivh cerebro-0.9.2-1.noarch.rpm

修改配置文件
修改??/etc/cerebro/application.conf???配置文件找到对应配置修改为以下内容{{< codes 修改内容一 修改内容二> }}{{??}}??
shell

data.path: "/var/lib/cerebro/cerebro.db"

#data.path = "./cerebro.db"

??{{???}}{{??}}
shell

hosts = [

#{

#host = "http://localhost:9200"

#name = "Localhost cluster"

#headers-whitelist = [ "x-proxy-user", "x-proxy-roles", "X-Forwarded-For" ]

#}

# Example of host with authentication

{

host = "http://10.0.11.172:9200"

name = "elk-cluster"

auth = {

username = "elastic"

password = "123"

}

}

]

{{?
?}}{{}}
报错
{{< notice warning "error" > }}cerebro[8073]: No java installations was detected.{{< /notice > }}启动服务后报错??No java???,但是我的环境是有??JAVA??的。也做了全局变量感觉很奇怪...
解决方法{{< notice success "解决方法" > }}在启动服务文件中加入??JAVA_HOME??{{< /notice > }}
  • 找到服务启动文件??/usr/share/cerebro/bin/cerebro??
  • 修改??/usr/share/cerebro/bin/cerebro???中的??JAVA_HOME??
具体如下,根据自己的??JAVA_HOME???填写路径{{< codes 修改前 修改后> }}{{??}}??
shell

if [[ -n "$bundled_jvm" ]]; then

echo "$bundled_jvm/bin/java"

elif [[ -n "$JAVA_HOME" ]] & & [[ -x "$JAVA_HOME/bin/java" ]]; then

echo "$JAVA_HOME/bin/java"

else

echo "java"

fi

??{{???}}{{??}}
shell

if [[ -n "$bundled_jvm" ]]; then

echo "$bundled_jvm/bin/java"

elif [[ -n "/home/tools/jdk1.8.0_221" ]] & & [[ -x "/home/tools/jdk1.8.0_221/bin/java" ]]; then

echo "/home/tools/jdk1.8.0_221/bin/java"

else

echo "java"

fi

{{?
?}}{{}}
启动服务
shell

systemctlstart cerebro.service

systemctlenable cerebro.service

systemctlstatuscerebro.service

可以看到监听的是??9000??端口访问试下

部署Kibana下载安装
https://artifacts.elastic.co/downloads/kibana/kibana-7.10.1-x86_64.rpm

rpm -ivh kibana-7.10.1-x86_64.rpm

修改备份配置文件
  • 备份配置文件
cd /etc/kibana/

mv kibana.yml kibana.yml.bak

  • 修改配置文件
vim kibana.yml

server.port: 5601

server.host: 0.0.0.0

elasticsearch.hosts: ["http://10.0.11.172:9200/","http://10.0.21.117:9200/","http://10.0.11.208:9200/"]

elasticsearch.username: "elastic"

elasticsearch.password: "123"

i18n.locale: "zh-CN"

启动服务器
shell

systemctlstartkibana.service

systemctlenablekibana.service

systemctlstatuskibana.service

访问WEB
访问??http://IP:5601??

部署filebeat
shell

wget https://pan.cnsre.cn/d/Package/Linux/ELK/filebeat-7.10.1-x86_64.rpm

rpm -ivh filebeat-7.10.1-x86_64.rpm
cd /etc/filebeat/

cp filebeat.yml filebeat.yml.bak

修改配置文件
修改filebeat配置文件,把日志推送到kafka
yaml

#=========================== Filebeat inputs =============================

max_procs: 1#限制filebeat的进程数量,其实就是内核数,避免过多抢占业务资源

queue.mem.events: 256# 存储于内存队列的事件数,排队发送 (默认4096)

queue.mem.flush.min_events: 128# 小于 queue.mem.events ,增加此值可提高吞吐量 (默认值2048)

filebeat.inputs:# inputs为复数,表名type可以有多个

- type: log# 输入类型

enable: true# 启用这个type配置

paths:

- /home/homeconnect/logs/AspectLog/aspect.log# 监控tomcat的业务日志

json.keys_under_root: true#默认Flase,还会将json解析的日志存储至messages字段

json.overwrite_keys: true#覆盖默认的key,使用自定义json格式的key

max_bytes: 20480# 单条日志的大小限制,建议限制(默认为10M,queue.mem.events * max_bytes 将是占有内存的一部)

fields:# 额外的字段

source: test-prod-tomcat-aspect-a# 自定义source字段,用于es建议索引(字段名小写,我记得大写好像不行)



- type: log# 输入类型

enable: true# 启用这个type配置

paths:

- /home/tools/apache-tomcat-8.5.23/logs/localhost_access_log.*.log# 监控tomcat access日志

json.keys_under_root: true#默认Flase,还会将json解析的日志存储至messages字段

json.overwrite_keys: true#覆盖默认的key,使用自定义json格式的key

max_bytes: 20480# 单条日志的大小限制,建议限制(默认为10M,queue.mem.events * max_bytes 将是占有内存的一部分)

fields:# 额外的字段

source: test-prod-tomcat-access-a# 自定义source字段,用于es建议索引



# 自定义es的索引需要把ilm设置为false

setup.ilm.enabled: false



#=============================== output ===============================

output.kafka:# 输出到kafka

enabled: true# 该output配置是否启用

hosts: ["10.0.11.172:9092","10.0.21.117:9092","10.0.11.208:9092"] # kafka节点列表

topic: logstash-%{[fields.source]}# kafka会创建该topic,然后logstash(可以过滤修改)会传给es作为索引名称

partition.hash:

reachable_only: true # 是否只发往可达分区

compression: gzip# 压缩

max_message_bytes: 1000000# Event最大字节数。默认1000000。应小于等于kafka broker message.max.bytes值

required_acks: 1# kafka ack等级

worker: 1# kafka output的最大并发数

bulk_max_size: 2048# 单次发往kafka的最大事件数

logging.to_files: true# 输出所有日志到file,默认true, 达到日志文件大小限制时,日志文件会自动限制替换



#=============================== other ===============================

close_older: 30m# 如果文件在某个时间段内没有发生过更新,则关闭监控的文件handle。默认1h

force_close_files: false # 这个选项关闭一个文件,当文件名称的变化。只在window建议为true

# 没有新日志采集后多长时间关闭文件句柄,默认5分钟,设置成1分钟,加快文件句柄关闭

close_inactive: 1m

# 传输了3h后荏没有传输完成的话就强行关闭文件句柄,这个配置项是解决以上案例问题的key point

close_timeout: 3h

# 这个配置项也应该配置上,默认值是0表示不清理,不清理的意思是采集过的文件描述在registry文件里永不清理,在运行一段时间后,registry会变大,可能会带来问题

clean_inactive: 72h

# 设置了clean_inactive后就需要设置ignore_older,且要保证ignore_older < clean_inactive

ignore_older: 70h

启动服务
shell

systemctlstartfilebeat.service

systemctlenablefilebeat.service

systemctlstatusfilebeat.service

部署logstash下载安装
shell

wget https://pan.cnsre.cn/d/Package/Linux/ELK/logstash-7.10.1-x86_64.rpm

rpm -ivh logstash-7.10.1-x86_64.rpm

mv logstash.yml logstash.yml.bak

修改配置文件
修改??logstash.yml??
yml

vim logstash.yml

http.host: "0.0.0.0"

# 指发送到Elasticsearch的批量请求的大小,值越大,处理则通常更高效,但增加了内存开销

pipeline.batch.size: 3000

# 指调整Logstash管道的延迟,过了该时间则logstash开始执行过滤器和输出

pipeline.batch.delay: 200

修改配置文件,从kafka获取日志
shell

[root@elk-kibana conf.d]# cat /etc/logstash/conf.d/get-kafka-logs.conf
input {# 输入组件

kafka {# 从kafka消费数据

bootstrap_servers => ["10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092"]

codec => "json"# 数据格式

#topics => ["3in1-topi"]# 使用kafka传过来的topic

topics_pattern => "logstash-.*"# 使用正则匹配topic

consumer_threads => 3# 消费线程数量

decorate_events => true# 可向事件添加Kafka元数据,比如主题、消息大小的选项,这将向logstash事件中添加一个名为kafka的字段

auto_offset_reset => "latest"# 自动重置偏移量到最新的偏移量

#group_id => "logstash-node"# 消费组ID,多个有相同group_id的logstash实例为一个消费组

#client_id => "logstash1"# 客户端ID

fetch_max_wait_ms => "1000"# 指当没有足够的数据立即满足fetch_min_bytes时,服务器在回答fetch请求之前将阻塞的最长时间

}

}



filter{

# 当非业务字段时,无traceId则移除

#if ([message] =~ "traceId=null") {# 过滤组件,这里只是展示,无实际意义,根据自己的业务需求进行过滤

#drop {}

#}

mutate {

convert => ["Request time", "float"]

}

if [ip] != "-" {

geoip {

source => "ip"

target => "geoip"

# database => "/usr/share/GeoIP/GeoIPCity.dat"

add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]

add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"]

}

mutate {

convert => [ "[geoip][coordinates]", "float"]

}

}

}





output {# 输出组件

elasticsearch {# Logstash输出到es

hosts => ["10.0.11.172:9200","10.0.21.117:9200","10.0.11.208:9200"]

index => "logstash-%{[fields][source]}-%{+YYYY-MM-dd}"# 直接在日志中匹配

#index => "%{[@metadata][topic]}-%{+YYYY-MM-dd}"# 以日期建索引

user => "elastic"

password => "123"

}

#stdout {

#codec => rubydebug

#}

}

测试接收日志
测试是否能接收到数据
shell

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/get-kafka-logs.conf

下边把??logstash???设置为使用??systemd???启动修改??/etc/systemd/system/logstash.service??文件
shell

[Unit]

Description=root



[Service]

Type=simple

User=root

Group=root

# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.

# Prefixing the path with - makes it try to load, but if the file doesnt

# exist, it continues onward.

EnvironmentFile=-/etc/default/logstash

EnvironmentFile=-/etc/sysconfig/logstash

ExecStart=/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"

Restart=alway

WorkingDirectort=/

Nice=19

LimitNOFILE=16384



[Install]

WantedBy=multi-user.target

在启动程序??/usr/share/logstash/bin/logstash.lib.sh???中加入??JAVA_HOME???在文件86行左右的??if [ -z "$JAVACMD" ]; then???代码上方插入一行??JAVACMD="/home/tools/jdk1.8.0_221/bin/java"??? 具体的路径需要你根据自己的??JAVA??来修改。
shell

[root@elk-kibana ~]# cat-n /usr/share/logstash/bin/logstash.lib.sh |grepJAVACMD

85# set the path to java into JAVACMD which will be picked up by JRuby to launch itself

86JAVACMD="/home/tools/jdk1.8.0_221/bin/java"

87if [ -z "$JAVACMD" ]; then

启动服务
shell

systemctl reload logstash.service

systemctl restartlogstash.service

systemctl enablelogstash.service

最后检查登录kibana创建索引
【ELK+kafka+filebeat搭建生产ELFK集群 --wukong编辑器】选择??管理???--?

    推荐阅读