监控Kafka消费者组的消费积压

kafka学习笔记--使用java api监控Kafka消费者组的消费积压

/** * @date 2020/1/6 10:04 * @description 获取Kafka指定消费者组堆积的偏移量, 并发送邮件预警 */ @Slf4j public class KafkaOffsetTools { private static final Logger logger = LoggerFactory.getLogger(Property.class); public static void main(String[] args) throws InterruptedException {String topics = Property.getProperty("topics"); String broker = Property.getProperty("broker"); int port = 9092; String servers = Property.getProperty("servers"); String clientId = Property.getProperty("clientId"); int correlationId = 0; while (true) { List brokerlist = new ArrayList<>(); brokerlist.add(broker); KafkaOffsetTools kafkaOffsetTools = new KafkaOffsetTools(); String[] topicArgs = topics.split(","); StringBuilder sb = new StringBuilder(); for (String topic : topicArgs) { TreeMap metadatas = kafkaOffsetTools.findLeader(brokerlist, port, topic); List partitions = new ArrayList<>(); for (Entry entry : metadatas.entrySet()) { int partition = entry.getKey(); TopicAndPartition testPartition = new TopicAndPartition(topic, partition); partitions.add(testPartition); } String groups = Property.getProperty(topic); String[] groupArgs = groups.split(","); sb.setLength(0); BlockingChannel channel = new BlockingChannel(broker, port, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000); for (String group : groupArgs) { long sumLogSize = 0L; long sumOffset = 0L; long lag = 0L; KafkaConsumer kafkaConsumer = kafkaOffsetTools.getKafkaConsumer(group, topic, servers); for (Entry entry : metadatas.entrySet()) { int partition = entry.getKey(); try { channel.connect(); OffsetFetchRequest fetchRequest = new OffsetFetchRequest(group, partitions, (short) 1, correlationId, clientId); channel.send(fetchRequest.underlying()); /* * 消费的commited offset, 针对kafka 0.9及以后的版本, 提交的offset可以选择保存在broker上的__consumer_offsets的内部topic上, Burrow还是通过sarama来消费__consumer_offsets这个topic来获取; */ OffsetAndMetadata committed = kafkaConsumer.committed(new TopicPartition(topic, partition)); long partitionOffset = committed.offset(); sumOffset += partitionOffset; //消费偏移量大小/* 需要获取各group的消费的topic的各个partition的broker offset,就是实际生产的msg的条数, 通过sarama可以轻松获取, 当然这个需要周期性不间断获取; * */ String leadBroker = entry.getValue().leader().host(); String clientName = "Client_" + topic + "_" + partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); //long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), consumer.clientId()); sumLogSize += readOffset; //消息总大小logger.info("group: " + group + " " + partition + ":" + readOffset); consumer.close(); } catch (Exception e) { e.printStackTrace(); channel.disconnect(); } }logger.info("logSize:" + sumLogSize); logger.info("offset:" + sumOffset); lag = sumLogSize - sumOffset; logger.info("lag:" + lag); sb.append("消费者组 " + group + " 积压的偏移量为: " + lag).append("\n"); } String title = topic + " 消费者消费情况"; EmailSender emailSender = new EmailSender(); emailSender.sendMail(title, sb.toString()); }Thread.sleep(60000 * Integer.valueOf(Property.getProperty("sleepTime"))); }}/** * 获取Kafka消费者实例 * * @param group消费者组 * @param topic主题名 * @param servers 服务器列表 * @return KafkaConsumer */ private KafkaConsumer getKafkaConsumer(String group, String topic, String servers) { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("max.poll.records", 100); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); return consumer; }private KafkaOffsetTools() { }/** * 获取该消费者组每个分区最后提交的偏移量 * * @param consumer消费者组对象 * @param topic主题 * @param partition分区 * @param whichTime最晚时间 * @param clientName 客户端名称 * @return 偏移量 */ private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map requestInfo = new HashMap<>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; }/** * 获取每个partation的元数据信息 * * @param seedBrokers 服务器列表 * @param port端口号 * @param topic主题名 * @return TreeMap */ private TreeMap findLeader(List seedBrokers, int port, String topic) { TreeMap map = new TreeMap<>(); for (String broker : seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, "leaderLookup" + new Date().getTime()); List topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); TopicMetadataResponse resp = consumer.send(req); List metaData = https://www.it610.com/article/resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { map.put(part.partitionId(), part); } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } return map; }}


【监控Kafka消费者组的消费积压】监控Kafka消费者组的消费积压
文章图片

https://xuemengran.blog.csdn.net/article/details/103875884



    推荐阅读