kafka消费者组cli

本文概述

  • 带Key的消费者
  • 有关消费者组的更多信息
  • 重置偏移
通常,Kafka消费者属于特定的消费者组。使用者组基本上代表了应用程序的名称。为了使用使用者组中的消息,使用了“ -group”命令。
让我们看看消费者将如何消费来自Kafka主题的消息:
步骤1:打开Windows命令提示符。
步骤2:使用“ -group”命令,例如:“ kafka-console-consumer -bootstrap-server localhost:9092 -topic -group < group_name> ”。给小组起个名字。按回车。
kafka消费者组cli

文章图片
在上面的快照中,组的名称为“ first_app”。可以看到没有显示任何消息,因为没有为此主题生成新消息。如果将使用“ -from-beginning”命令,则将显示所有先前的消息。
步骤3:要查看一些新消息,请从生产者控制台生成一些即时消息(如上一节所述)。
kafka消费者组cli

文章图片
因此,生产者产生的新消息可以在消费者的控制台中看到。
第4步:但是,只有一个消费者在该组中读取数据。让我们创建更多的消费者以了解消费者群体的力量。为此,请打开一个新终端并键入与以下命令完全相同的使用者命令:
‘ kafka-console-consumer.bat-引导服务器127.0.0.1:9092 – topic < 主题名称> – group < 组名称> ” 。
kafka消费者组cli

文章图片
kafka消费者组cli

文章图片
在上面的快照中,很明显生产者正在将数据发送到Kafka主题。两个使用者正在使用消息。查看消息的顺序。由于为“ myfirst”主题创建了三个分区(前面已经讨论过),因此仅按该顺序拆分消息。
我们可以在同一组中进一步创建更多使用者,每个使用者将根据分区数使用消息。尝试自己更好地理解。
注意:组ID应该相同,然后只有消息将在使用者之间分配。但是,如果终止任何使用者,则将分区重新分配给活动使用者,并且这些活动使用者将接收到消息。
因此,以这种方式,消费者组中的各个消费者都消费了来自Kafka主题的消息。
带Key的消费者生产者在数据中附加了键值后,它将被存储到该指定分区。如果未指定键值,则数据将移动到任何分区。因此,当消费者使用密钥读取消息时,如果未指定密钥,则该消息将显示为空。要使用来自Kafka主题的消息,需要使用’ print.key’ 和’ key.seperator’ 。使用的命令是:
‘ kafka-console-consumer -bootstrap-server localhost:9092 -topic < topic_name> – from-beginning -property print.key = true -property key.seperator =,’
使用上述命令,使用者可以使用指定的键读取数据。
有关消费者组的更多信息‘ – from-beginning’ 命令
该命令用于从头开始阅读消息(前面已经讨论过)。因此,在使用者组中使用它会产生以下输出:
kafka消费者组cli

文章图片
可以注意到,新的使用者组“ second_app”用于从头开始读取消息。如果再执行一次同一命令,它将不会显示任何输出。这是因为偏移量是在Apache Kafka中提交的。因此,一旦消费者组读取了所有直到写入的消息,下一次它将仅读取新消息。
例如,在下面的快照中,当再次使用“ -from-beginning”命令时,仅读取新消息。这是因为所有先前的消息仅在较早时使用。
kafka消费者组cli

文章图片
‘ kafka-consumer-groups’ 命令
此命令提供了整个文档,以列出所有组,描述组,删除使用者信息或重置使用者组偏移量。
kafka消费者组cli

文章图片
它需要引导服务器,客户端才能在使用者组上执行不同的功能。
列出消费者群体
“ -list”命令用于列出Kafka群集中可用的使用者组数量。该命令用作:
‘ kafka-consumer-groups.bat -bootstrap-server localhost:9092 -list’ 。
快照如下所示,其中存在三个消费者组。
kafka消费者组cli

文章图片
描述消费者群
“ – describe”命令用于描述使用者组。该命令用作:
‘ kafka-consumer-groups.bat-引导服务器-本地主机:9092-描述组< group_name> ’
kafka消费者组cli

文章图片
此命令描述是否存在任何活动的使用者,当前偏移值,滞后值为0-表示该使用者已读取所有数据。
重置偏移偏移在Apache Kafka中提交。因此,如果用户想再次阅读消息,则需要重置偏移值。 “ Kafka-consumer-groups”命令提供了一个重置??偏移量的选项。重置偏移值意味着定义用户要从其再次读取消息的位置。它一次仅支持一个消费者组,并且该组不应有任何活动实例。
重置偏移量时,用户需要选择三个参数:
  1. 执行选项
  2. 重置规格
  3. 范围
有两个执行选项可用:
‘ -dry-run’ :这是默认执行选项。此选项用于计划需要重置的偏移量。
-execute’ :此选项用于更新偏移值。
有以下可用的重置规格:
‘ -to-datetime’ :根据从datetime开始的偏移量重置偏移量。使用的格式为:“ YYYY-MM-DDTHH:mm:SS.sss”。
‘ – to-estally’ :将偏移量重置为最早的偏移量。
‘ – to-latest’ :将偏移量重置为最新的偏移量。
‘ – shift-‘ :通过将当前偏移值偏移’ n’ 来重置偏移。 “ n”的值可以为正或负。
‘ – from-file’ :将偏移量重置为CSV文件中定义的值。
‘ – to-current’ :将偏移量重置为当前偏移量。
可以定义两个范围:
‘ -all-topics’ :重置组中所有可用主题的偏移值。
‘ -topics’ :仅重置指定主题的偏移值。用户需要指定主题名称以重置偏移值。
让我们尝试看看:
1)使用“-至最早”命令
kafka消费者组cli

文章图片
在上面的快照中,偏移量被重置为新偏移量0。这是因为使用了“ -to-最早”命令,该命令已将偏移量值重置为0。
2)使用-shift-by命令
kafka消费者组cli

文章图片
kafka消费者组cli

文章图片
【kafka消费者组cli】在第一个快照中,偏移值从“ 0”移到“ 2”。在第二个中,偏移值从“ 2”转换为“ -1”。
注意:要将偏移值移为正数,没有必要使用” 符号。默认情况下,仅将其视为正数。

    推荐阅读