监控Kafka消费者组的消费积压
kafka学习笔记--使用java api监控Kafka消费者组的消费积压
/**
* @date 2020/1/6 10:04
* @description 获取Kafka指定消费者组堆积的偏移量, 并发送邮件预警
*/
@Slf4j
public class KafkaOffsetTools {
private static final Logger logger = LoggerFactory.getLogger(Property.class);
public static void main(String[] args) throws InterruptedException {String topics = Property.getProperty("topics");
String broker = Property.getProperty("broker");
int port = 9092;
String servers = Property.getProperty("servers");
String clientId = Property.getProperty("clientId");
int correlationId = 0;
while (true) {
List brokerlist = new ArrayList<>();
brokerlist.add(broker);
KafkaOffsetTools kafkaOffsetTools = new KafkaOffsetTools();
String[] topicArgs = topics.split(",");
StringBuilder sb = new StringBuilder();
for (String topic : topicArgs) {
TreeMap metadatas = kafkaOffsetTools.findLeader(brokerlist, port, topic);
List partitions = new ArrayList<>();
for (Entry entry : metadatas.entrySet()) {
int partition = entry.getKey();
TopicAndPartition testPartition = new TopicAndPartition(topic, partition);
partitions.add(testPartition);
}
String groups = Property.getProperty(topic);
String[] groupArgs = groups.split(",");
sb.setLength(0);
BlockingChannel channel = new BlockingChannel(broker, port, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000);
for (String group : groupArgs) {
long sumLogSize = 0L;
long sumOffset = 0L;
long lag = 0L;
KafkaConsumer kafkaConsumer = kafkaOffsetTools.getKafkaConsumer(group, topic, servers);
for (Entry entry : metadatas.entrySet()) {
int partition = entry.getKey();
try {
channel.connect();
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(group, partitions, (short) 1, correlationId, clientId);
channel.send(fetchRequest.underlying());
/*
* 消费的commited offset, 针对kafka 0.9及以后的版本, 提交的offset可以选择保存在broker上的__consumer_offsets的内部topic上, Burrow还是通过sarama来消费__consumer_offsets这个topic来获取;
*/
OffsetAndMetadata committed = kafkaConsumer.committed(new TopicPartition(topic, partition));
long partitionOffset = committed.offset();
sumOffset += partitionOffset;
//消费偏移量大小/*
需要获取各group的消费的topic的各个partition的broker offset,就是实际生产的msg的条数, 通过sarama可以轻松获取, 当然这个需要周期性不间断获取;
* */
String leadBroker = entry.getValue().leader().host();
String clientName = "Client_" + topic + "_" + partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
//long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), consumer.clientId());
sumLogSize += readOffset;
//消息总大小logger.info("group: " + group + " " + partition + ":" + readOffset);
consumer.close();
} catch (Exception e) {
e.printStackTrace();
channel.disconnect();
}
}logger.info("logSize:" + sumLogSize);
logger.info("offset:" + sumOffset);
lag = sumLogSize - sumOffset;
logger.info("lag:" + lag);
sb.append("消费者组 " + group + " 积压的偏移量为: " + lag).append("\n");
}
String title = topic + " 消费者消费情况";
EmailSender emailSender = new EmailSender();
emailSender.sendMail(title, sb.toString());
}Thread.sleep(60000 * Integer.valueOf(Property.getProperty("sleepTime")));
}}/**
* 获取Kafka消费者实例
*
* @param group消费者组
* @param topic主题名
* @param servers 服务器列表
* @return KafkaConsumer
*/
private KafkaConsumer getKafkaConsumer(String group, String topic, String servers) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records", 100);
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
return consumer;
}private KafkaOffsetTools() {
}/**
* 获取该消费者组每个分区最后提交的偏移量
*
* @param consumer消费者组对象
* @param topic主题
* @param partition分区
* @param whichTime最晚时间
* @param clientName 客户端名称
* @return 偏移量
*/
private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}/**
* 获取每个partation的元数据信息
*
* @param seedBrokers 服务器列表
* @param port端口号
* @param topic主题名
* @return TreeMap
*/
private TreeMap findLeader(List seedBrokers, int port, String topic) {
TreeMap map = new TreeMap<>();
for (String broker : seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, "leaderLookup" + new Date().getTime());
List topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = consumer.send(req);
List metaData = https://www.it610.com/article/resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
map.put(part.partitionId(), part);
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
return map;
}}
【监控Kafka消费者组的消费积压】
文章图片
https://xuemengran.blog.csdn.net/article/details/103875884
推荐阅读
- 深入浅出谈一下有关分布式消息技术(Kafka)
- 15.Kafka
- 监控nginx
- sentry搭建错误监控系统(二)
- 网络|一文彻底搞懂前端监控
- linux|linux|常用的系统监控命令
- Linux监控工具(atop安装使用)
- Springboot整合kafka的示例代码
- influxDB|influxDB + grafana + python 监控windows服务器流量
- JVM监控工具教程