智者不为愚者谋,勇者不为怯者死。这篇文章主要讲述logback KafkaAppender 写入Kafka队列,集中日志输出.相关的知识,希望能为你提供帮助。
为了减少应用服务器对磁盘的读写,以及可以集中日志在一台机器上,方便使用ELK收集日志信息,所以考虑做一个jar包,让应用集中输出日志
Redis 自定义 RedisAppender 插件, 实现日志缓冲队列,集中日志输出.
文章图片
网上搜了一圈,只发现有人写了个程序在github
地址:https://github.com/johnmpage/logback-kafka
Redis 自定义 RedisAppender 插件, 实现日志缓冲队列,集中日志输出.本来打算引用一下这个jar就完事了,没想到在pom里下不下来,只好把源码下了,拷贝了代码过来,自己修改一下.
首先,安装一个Kafka,作为一个懒得出神入化得程序员,我选择的安装方式是
启动zookeeper容器
docker run -d --name zookeeper --net=host-t wurstmeister/zookeeper
启动kafka容器
docker run --name kafka -d -e HOST_IP=192.168.1.7 --net=host -v /usr/local/docker/kafka/conf/server.properties:/opt/kafka_2.12-1.0.0/config/server.properties-v /etc/localtime:/etc/localtime:ro -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -t wurstmeister/kafka
要修改Kafka的server.properties 中zookeeper配置
配置文件如下
listeners=PLAINTEXT://192.168.1.7:9092 delete.topic.enable=true advertised.listeners=PLAINTEXT://192.168.1.7:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 log.dirs=/kafka/kafka-logs-92cfb0bbd88c num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.bytes=10737418240 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.1.7:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 version=1.0.0
文章图片
启动好了,新建SpringBoot项目,首先消费队列的
pom文件
< ?xml version="1.0" encoding="UTF-8"?> < project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> < modelVersion> 4.0.0< /modelVersion> < groupId> com.lzw< /groupId> < artifactId> kafkalog< /artifactId> < version> 0.0.1-SNAPSHOT< /version> < packaging> jar< /packaging> < name> kafkalog< /name> < description> Demo project for Spring Boot< /description> < parent> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter-parent< /artifactId> < version> 2.0.0.M6< /version> < relativePath/> < !-- lookup parent from repository --> < /parent> < properties> < project.build.sourceEncoding> UTF-8< /project.build.sourceEncoding> < project.reporting.outputEncoding> UTF-8< /project.reporting.outputEncoding> < java.version> 1.8< /java.version> < /properties> < dependencies> < dependency> < groupId> org.springframework.kafka< /groupId> < artifactId> spring-kafka< /artifactId> < /dependency> < dependency> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter< /artifactId> < /dependency> < dependency> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter-test< /artifactId> < scope> test< /scope> < /dependency> < /dependencies> < build> < plugins> < plugin> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-maven-plugin< /artifactId> < /plugin> < /plugins> < /build> < repositories> < repository> < id> spring-snapshots< /id> < name> Spring Snapshots< /name> < url> https://repo.spring.io/snapshot< /url> < snapshots> < enabled> true< /enabled> < /snapshots> < /repository> < repository> < id> spring-milestones< /id> < name> Spring Milestones< /name> < url> https://repo.spring.io/milestone< /url> < snapshots> < enabled> false< /enabled> < /snapshots> < /repository> < /repositories> < pluginRepositories> < pluginRepository> < id> spring-snapshots< /id> < name> Spring Snapshots< /name> < url> https://repo.spring.io/snapshot< /url> < snapshots> < enabled> true< /enabled> < /snapshots> < /pluginRepository> < pluginRepository> < id> spring-milestones< /id> < name> Spring Milestones< /name> < url> https://repo.spring.io/milestone< /url> < snapshots> < enabled> false< /enabled> < /snapshots> < /pluginRepository> < /pluginRepositories> < /project>
程序结构
文章图片
KafkaConfig
package com.lzw.kafkalog.config; /** * Created by laizhenwei on 2017/11/28 */ @Configuration @EnableKafka public class KafkaConfig {@Value("${spring.kafka.consumer.bootstrap-servers}") private String consumerBootstrapServers; @Value("${spring.kafka.producer.bootstrap-servers}") private String producerBootstrapServers; @Bean KafkaListenerContainerFactory< ConcurrentMessageListenerContainer< Integer, String> > kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory< Integer, String> factory = new ConcurrentKafkaListenerContainerFactory< > (); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; }
@Bean public ConsumerFactory< Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory< > (consumerConfigs()); }@Bean public Map< String, Object> consumerConfigs() { Map< String, Object> props = new HashMap< > (); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }@Bean public Areceiver areceiver() { return new Areceiver(); }@Bean public Breceiver breceiver(){ return new Breceiver(); } }
KafkaAdminConfig
package com.lzw.kafkalog.config; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; import java.util.HashMap; import java.util.Map; /** * Created by laizhenwei on 2017/11/28 */ @Configuration public class KafkaAdminConfig {@Value("${spring.kafka.producer.bootstrap-servers}") private String producerBootstrapServers; @Bean public KafkaAdmin admin() { Map< String, Object> configs = new HashMap< > (); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,producerBootstrapServers); return new KafkaAdmin(configs); }/** * 创建队列A,1个分区 * @return */ @Bean public NewTopic a() { return new NewTopic("A", 1, (short) 1); }/** * 创建队列B,1个分区 * @return */ @Bean public NewTopic b() { return new NewTopic("B", 1, (short) 1); } }
B队列消费者
package com.lzw.kafkalog.b; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; /** * Created by laizhenwei on 2017/11/28 */ public class Breceiver { Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics={"B"}) public void listen(ConsumerRecord data) { logger.info(data.value().toString()); } }
application.yml
spring: kafka: consumer: bootstrap-servers: 192.168.1.7:9092 producer: bootstrap-servers: 192.168.1.7:9092
logback-test.xml
< ?xml version="1.0" encoding="UTF-8"?> < configuration debug="true"> < contextName> logback< /contextName> < property name="LOG_HOME" value="F:/log" /> < appender name="aAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> < file> ${LOG_HOME}/a/a.log< /file> < rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> < fileNamePattern> ${LOG_HOME}/a/a-%d{yyyy-MM-dd}.%i.log< /fileNamePattern> < !--< fileNamePattern> ${LOG_HOME}/a/a-%d{yyyy-MM-dd}.%i.tar.gz< /fileNamePattern> --> < !-- 日志文件保留天数 --> < MaxHistory> 30< /MaxHistory> < !-- 文件大小触发重写新文件 --> < MaxFileSize> 100MB< /MaxFileSize> < totalSizeCap> 10GB< /totalSizeCap> < /rollingPolicy> < encoder> < pattern> %msg%n< /pattern> < /encoder> < /appender> < appender name="bAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> < file> ${LOG_HOME}/b/b.log< /file> < rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> < fileNamePattern> ${LOG_HOME}/b/b-%d{yyyy-MM-dd}.%i.log< /fileNamePattern> < !--< fileNamePattern> ${LOG_HOME}/b/b-%d{yyyy-MM-dd}.%i.tar.gz< /fileNamePattern> --> < !-- 日志文件保留天数 --> < MaxHistory> 30< /MaxHistory> < !-- 文件大小触发重写新文件 --> < MaxFileSize> 100MB< /MaxFileSize> < totalSizeCap> 10GB< /totalSizeCap> < /rollingPolicy> < encoder> < pattern> %msg%n< /pattern> < /encoder> < /appender> < !--异步输出--> < appender name="aAsyncFile" class="ch.qos.logback.classic.AsyncAppender"> < discardingThreshold> 0< /discardingThreshold> < queueSize> 2048< /queueSize> < appender-ref ref="aAppender" /> < /appender> < logger name="com.lzw.kafkalog.a" level="INFO" additivity="false"> < appender-ref ref="aAsyncFile" /> < /logger> < !--异步输出--> < appender name="bAsyncFile" class="ch.qos.logback.classic.AsyncAppender"> < discardingThreshold> 0< /discardingThreshold> < queueSize> 2048< /queueSize> < appender-ref ref="bAppender" /> < /appender> < logger name="com.lzw.kafkalog.b" level="INFO" additivity="false"> < appender-ref ref="bAsyncFile" /> < /logger> < /configuration>
消费者程序,重点是红框部分
文章图片
红框源码,本来想做个容错,后来发现不行,原因等下再说
package com.lzw.project_b.kafka; import ch.qos.logback.core.AppenderBase; import ch.qos.logback.core.Layout; import ch.qos.logback.core.status.ErrorStatus; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.StringReader; import java.util.Properties; public class KafkaAppender< E> extends AppenderBase< E> {protected Layout< E> layout; private static final Logger LOGGER = LoggerFactory.getLogger("local"); private boolean logToLocal = false; private String kafkaProducerProperties; private String topic; private KafkaProducer producer; public void start() { super.start(); int errors = 0; if (this.layout == null) { this.addStatus(new ErrorStatus("No layout set for the appender named \\"" + this.name + "\\".", this)); ++errors; }if (errors == 0) { super.start(); }LOGGER.info("Starting KafkaAppender..."); final Properties properties = new Properties(); try { properties.load(new StringReader(kafkaProducerProperties)); producer = new KafkaProducer< > (properties); } catch (Exception exception) { System.out.println("KafkaAppender: Exception initializing Producer. " + exception + " : " + exception.getMessage()); } System.out.println("KafkaAppender: Producer initialized: " + producer); if (topic == null) { System.out.println("KafkaAppender requires a topic. Add this to the appender configuration."); } else { System.out.println("KafkaAppender will publish messages to the \'" + topic + "\' topic."); } LOGGER.info("kafkaProducerProperties = {}", kafkaProducerProperties); LOGGER.info("Kafka Producer Properties = {}", properties); if (logToLocal) { LOGGER.info("KafkaAppender: kafkaProducerProperties = \'" + kafkaProducerProperties + "\'."); LOGGER.info("KafkaAppender: properties = \'" + properties + "\'."); } }@Override public void stop() { super.stop(); LOGGER.info("Stopping KafkaAppender..."); producer.close(); }@Override protected void append(E event) { /** * 源码这里是用Formatter类转为JSON */ String msg = layout.doLayout(event); ProducerRecord< String, String> producerRecord = new ProducerRecord< > (topic, msg); producer.send(producerRecord); }public String getTopic() { return topic; }public void setTopic(String topic) { this.topic = topic; }public boolean getLogToLocal() { return logToLocal; }public void setLogToLocal(String logToLocal) { if (Boolean.valueOf(logToLocal)) { this.logToLocal = true; } }public void setLayout(Layout< E> layout) { this.layout = layout; }public String getKafkaProducerProperties() { return kafkaProducerProperties; }public void setKafkaProducerProperties(String kafkaProducerProperties) { this.kafkaProducerProperties = kafkaProducerProperties; } }
LogService就记录一段长的垃圾日志
package com.lzw.project_b.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * Created by laizhenwei on 2017/12/1 */ @Component public class LogService { Logger logger = LoggerFactory.getLogger(this.getClass()); private static final String msg = "asdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" + "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdf" + "sadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" + "sdfsadfasdfsadfasdfsaasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa" + "dfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadf" + "asdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfas" + "dfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa" + "dfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfas" + "dfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadf" + "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" + "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa"; public void dolog() { logger.info(msg, new RuntimeException(msg)); }}
KafkaLogController就一个很无聊的输出日志请求,并记录入队时间
package com.lzw.project_b.controller; import com.lzw.project_b.service.LogService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * Created by laizhenwei on 2017/11/29 */ @RestController @RequestMapping(path = "/kafka") public class KafkaLogController {@Autowired private LogService logService; @GetMapping(path = "/aa") public void aa() { long begin = System.nanoTime(); for (int i = 0; i < 100000; i++) { logService.dolog(); } long end = System.nanoTime(); System.out.println((end - begin) / 1000000); }}
启动两个程序,来一个请求
文章图片
查看耗时
文章图片
【logback KafkaAppender 写入Kafka队列,集中日志输出.】生产者的 logback-test.xml
< ?xml version="1.0" encoding="UTF-8"?> < configuration debug="true"> < appender name="KAFKA" class="com.lzw.project_b.kafka.KafkaAppender"> < topic> B< /topic> < kafkaProducerProperties> bootstrap.servers=192.168.1.7:9092 retries=0 value.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer < !--reconnect.backoff.ms=1--> producer.type=async request.required.acks=0 < !--acks=0--> < !--producer.type=async --> < !--request.required.acks=1 --> < !--queue.buffering.max.ms=20000 --> < !--queue.buffering.max.messages=1000--> < !--queue.enqueue.timeout.ms = -1 --> < !--batch.num.messages=8--> < !--metadata.fetch.timeout.ms=3000--> < !--producer.type=sync--> < !--request.required.acks=1--> < !--reconnect.backoff.ms=3000--> < !--retry.backoff.ms=3000--> < /kafkaProducerProperties> < logToLocal> true< /logToLocal> < layout class="ch.qos.logback.classic.PatternLayout"> < pattern> %date %level [%thread] %logger{36} [%file : %line] %msg%n< /pattern> < /layout> < /appender> 时间滚动输出 level为 monitor 日志 < appender name="localAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> < file> F:/localLog/b/b.log< /file> < rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> < fileNamePattern> F:/localLog/b/b-%d{yyyy-MM-dd}.%i.tar.gz< /fileNamePattern> < !-- 日志文件保留天数 --> < MaxHistory> 30< /MaxHistory> < !-- 文件大小触发重写新文件 --> < MaxFileSize> 200MB< /MaxFileSize> < totalSizeCap> 10GB< /totalSizeCap> < /rollingPolicy> < encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> < pattern> %date %level [%thread] %logger{36} [%file : %line] %msg%n< /pattern> < charset> UTF-8< /charset> < /encoder> < /appender> < appender name="asyncLocal" class="ch.qos.logback.classic.AsyncAppender"> < !-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 --> < discardingThreshold> 0< /discardingThreshold> < queueSize> 2048< /queueSize> < appender-ref ref="localAppender"/> < /appender> < !--万一kafka队列不通,记录到本地--> < logger name="local" additivity="false"> < appender-ref ref="asyncLocal"/> < /logger> < !--< appender name="asyncKafka" class="ch.qos.logback.classic.AsyncAppender"> --> <
推荐阅读
- webapp定位
- 获取Go程序汇编代码的3种方法
- 5种简化Docker镜像的通用方法
- 20分钟Python Matplotlib快速教程
- NumPy教程(Python机器学习库)
- 如何在Java中实现线程间通信
- 移动应用程序构建简单的REST API
- iOS和OSX的Swift 2.0简介
- 使用Go进行iOS和Android编程