本文介绍常用的topic管理命令,主要涉及kafka-topics脚本, kafka-reassign-partitions和kafka-config脚本,前者是专门的topic相关的脚本,中间的是分区重分配相关的脚本,后者是配置相关的脚本,不仅可以管理topic,还能管理broker,consumer等。
1.topic的管理:
增
kafka-topics --bootstrap-server broker_host:port --create --topic --partitions 1 --replication-factor 1
删
kafka-topics --bootstrap-server broker_host:port --delete --topic
查
- 查用户建的某个主题的详细参数describe
kafka-topics --bootstrap-server broker_host:port --describe --topic
- 查看所有的主题
kafka-topics --bootstrap-server broker_host:port --list
改
- 修改主题分区(数量只能增不能减,否则会跑出invalidPartitionsException的异常)
kafka-topics --bootstrap-server broker_host:port --alter --topic --partitions
- 修改主题级别的参数,比如max.message.bytes
kafka-configs --zookeeper zookeeper_host:port --entity-type topics --entity-name --alter-config max.message.bytes=100485760
- 变更副本数量
分为两个步骤:
1.提供一个json文件(reassign.json)去指定所有的topic的partition和replicas,replicas列表是broker_id,列表的第一个broker_id是leader replica所在的broker。
{"version":1, "partitions":[
{"topic":"topic_name","partition":0,"replicas":[0,1,2]},
{"topic":"topic_name","partition":1,"replicas":[0,2,1]},
{"topic":"topic_name","partition":2,"replicas":[1,0,2]},
{"topic":"topic_name","partition":3,"replicas":[1,2,0]}
]}
2.使用命令
kafka-reassign-partitions --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
- 修改主题限速
kafka-configs --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=1004857600,follower.replication.throttled.rate=104857600'
- 主题分区迁移
kafka-reassign-partitions
2.特殊的主题管理 内部主题:__consumer_offsets和__transaction_state
前者是记录consumer消费的进度的,后者是支持事物机制后引入的
- 查看位移提交数据
kafka-console-consumer --bootstrap-server host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
- 读取__consumer_offset主题消息,查看消费者组的状态信息
kafka-console-consumer --bootstrap-server host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
3.运行命令常见的错误 【学习笔记|【Kafka|常用CLI】Topic管理】1.删除主题失败
可能原因:
- broker down
- 删除的topic部分分区依然在执行迁移过程
- 手动删除zookeeper节点/admin/delete_topic/
这个znode - 手动删除该topic在磁盘上的分区目录
- 在zookeeper中执行rmr/controller,触发controller重选举,刷新controller缓存
可能原因:
- cleaner线程挂了,无法清理此内部主题
- 显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态,如果是这个原因导致的,就重启相应的 Broker
推荐阅读
- EECS 4412 Data Mining
- python|Python 学生信息管理系统------文章中源码100%真实有效-----如何将类、初始化属性、模块、循环判断、静态方法等一系列知识点结合起来做一个项目
- JavaWeb|基于Java开发的CMS内容管理系统
- 校招面试有感
- Linux script求解
- WebSocket在实时语音识别中的应用
- DATA7202 Statistical Methods
- Java基础|Java真的不难(三十)工厂模式(3)
- java|Tomcat错误页重定向