java|Log4j2异步将log发送到kafka (kafka及其依赖环境的docker配置和使用)

一,kafka config 1.下载,安装docker desktop在windows
https://hub.docker.com/?overlay=onboarding
2.设置docket kafka转接到本机的host export DOCKER_KAFKA_HOST=###.###.###.###
3.在docker安装kafka相关的images并启动容器 我们使用的是dicker-compose进行安装启动,dicker-comoise文件如下:

version: '2' services: zookeeper: container_name: zookeeper hostname: zookeeper image: wurstmeister/zookeeper ports: - "2181:2181" kafka: container_name: kafka hostname: kafka image: wurstmeister/kafka:0.9.0.0-1 ports: - "9092:9092" - "9093:9093" links: - zookeeper environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_KAFKA_HOST} JMX_PORT: 9093 KAFKA_ADVERTISED_PORT: 9092 KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_LOG_RETENTION_HOURS: 1 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 KAFKA_NUM_PARTITIONS: 2 KAFKA_DELETE_RETENTION_MS: 1000 volumes: - /var/run/docker.sock:/var/run/docker.sock kafka-manager: container_name: kafka-manager hostname: kafka-manager image: sheepkiller/kafka-manager ports: - "9000:9000" links: - zookeeper - kafka environment: ZK_HOSTS: zookeeper:2181 APPLICATION_SECRET: letmein kafdrop: container_name: kafdrop hostname: kafdrop image: thomsch98/kafdrop ports: - "9010:9010" environment: ZK_HOSTS: zookeeper:2181 LISTEN: 9010

在此文件的路径下运行docker-compose up,启动kafka相关的容器,像这样就成功启动了。
kafka-manager| [info] k.m.a.KafkaManagerActor - Updating internal state...

4.配置Cluster and topic http://localhost:9000/下add Cluster and creat topic
【java|Log4j2异步将log发送到kafka (kafka及其依赖环境的docker配置和使用)】java|Log4j2异步将log发送到kafka (kafka及其依赖环境的docker配置和使用)
文章图片

java|Log4j2异步将log发送到kafka (kafka及其依赖环境的docker配置和使用)
文章图片

二,project Config(springboot) 1.引入相关jar包
gradle:
configurations { //remove default logger all*.exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' } dependencies { ... compile ('org.springframework.boot:spring-boot-starter-log4j2') compile ('org.springframework.boot:spring-boot-configuration-processor:2.2.1.RELEASE') compile ('org.projectlombok:lombok:1.18.10') annotationProcessor ('org.projectlombok:lombok:1.18.10') //kafka clients compile ('org.apache.kafka:kafka-clients:0.9.0.1') //log4j2配置文件为yml格式时需要此jar compile ('com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.10.0') compile ('com.fasterxml.jackson.core:jackson-databind:2.10.0') compile ('com.fasterxml.jackson.core:jackson-core:2.10.0') //log4j2异步输出需要的jar compile ('com.lmax:disruptor:3.4.2') ... }

2.log4j2.yml配置
Configuration: Properties: # 定义全局变量 Property: - name: log-path value: "logs" - name: charset value: "UTF-8" - name: compact value: false - name: eventEol value: true # kafka topic name - name: kafka-topic value: neuralyzer # kafka host and port - name: bootstrap-servers value: 127.0.0.1:9092 - name: complete value: false - name: stacktraceAsString value: true # log打印的格式 - name: log.pattern value: "%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p ${PID:-} [%15.15t] %-30.30C{1.} : %m%n" # 控制台打印 Appenders: Console: name: CONSOLE target: SYSTEM_OUT PatternLayout: pattern: ${log.pattern}RollingFile: - name: INFO_LOG fileName: ${log-path}/info.log filePattern: "${log-path}/historyLog/info-%d{yyyy-MM-dd}-%i.log.gz" PatternLayout: charset: ${charset} pattern: ${log.pattern} Filters: ThresholdFilter: # 配置拒绝接受error level的log - level: error onMatch: DENY onMismatch: NEUTRAL # 配置拒绝接受warn level的log - level: warn onMatch: DENY onMismatch: NEUTRAL # 配置接受info level的log - level: info onMatch: ACCEPT onMismatch: DENY Policies: TimeBasedTriggeringPolicy: interval: 1 modulate: true DefaultRolloverStrategy: max: 100- name: WARN_LOG fileName: ${log-path}/warn.log filePattern: "${log-path}/historyLog/warn-%d{yyyy-MM-dd}-%i.log.gz" PatternLayout: charset: ${charset} pattern: ${log.pattern} Filters: ThresholdFilter: - level: error onMatch: DENY onMismatch: NEUTRAL - level: warn onMatch: ACCEPT onMismatch: DENY Policies: TimeBasedTriggeringPolicy: interval: 1 modulate: true DefaultRolloverStrategy: max: 100- name: ERROR_LOG fileName: ${log-path}/error.log filePattern: "${log-path}/historyLog/error-%d{yyyy-MM-dd}-%i.log.gz" PatternLayout: charset: ${charset} pattern: ${log.pattern} Filters: ThresholdFilter: - level: error onMatch: ACCEPT onMismatch: DENY Policies: TimeBasedTriggeringPolicy: interval: 1 modulate: true DefaultRolloverStrategy: max: 100 Kafka: - name: KAFKA_INFOLOG # kafka topic name也就是我们在上面create的topic的名字 topic: ${kafka-topic} Property: # kafka host and port name: bootstrap.servers value: ${bootstrap-servers} JsonLayout: - charset: ${charset} compact: ${compact} complete: ${complete} stacktraceAsString: ${stacktraceAsString} eventEol: ${eventEol} properties: true KeyValuePair: # link INFO_LOG的配置 - key: tags value: INFO_LOG - key: project value: neuralyzer - name: KAFKA_WARNLOG topic: ${kafka-topic} Property: name: bootstrap.servers value: ${bootstrap-servers} JsonLayout: - charset: ${charset} compact: ${compact} complete: ${complete} stacktraceAsString: ${stacktraceAsString} eventEol: ${eventEol} properties: true KeyValuePair: - key: tags value: WARN_LOG - key: project value: neuralyzer - name: KAFKA_ERRORLOG topic: ${kafka-topic} Property: name: bootstrap.servers value: ${bootstrap-servers} JsonLayout: - charset: ${charset} compact: ${compact} complete: ${complete} stacktraceAsString: ${stacktraceAsString} eventEol: ${eventEol} properties: true KeyValuePair: - key: tags value: ERROR_LOG - key: project value: neuralyzer Loggers: # 配置在root里面可以直接使用lombak的注解生效,从而不用写logFactory的代码 #AsyncRoot: # 配置log异步输出 AsyncRoot: level: info #add location in async使log的位置相关的信息生效 includeLocation: true AppenderRef: - ref: CONSOLE - ref: INFO_LOG - ref: WARN_LOG - ref: ERROR_LOG - ref: KAFKA_INFOLOG - ref: KAFKA_WARNLOG - ref: KAFKA_ERRORLOG

3.启动项目
控制台log:
Connected to the target VM, address: '127.0.0.1:62650', transport: 'socket' 2019-12-04 13:30:28.454 - INFO[main] o.a.k.c.c.AbstractConfig: ProducerConfig values: compression.type = none metric.reporters = [] metadata.max.age.ms = 300000 metadata.fetch.timeout.ms = 60000 reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [127.0.0.1:9092] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null max.block.ms = 60000 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null ssl.protocol = TLS request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] acks = 1 batch.size = 0 ssl.keystore.location = null receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT retries = 0 max.request.size = 1048576 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner send.buffer.bytes = 131072 linger.ms = 02019-12-04 13:30:28.460 - INFO[main] .a.k.c.u.AppInfoParser$AppInfo : Kafka version : 0.9.0.1 2019-12-04 13:30:28.460 - INFO[main] .a.k.c.u.AppInfoParser$AppInfo : Kafka commitId : 23c69d62a0cabf06

kafka log:
java|Log4j2异步将log发送到kafka (kafka及其依赖环境的docker配置和使用)
文章图片

    推荐阅读