Using MacOS (Kafka installed by Brew) as example
1. Start Zookeeper and Kafka
Make sure zookeeper and Kafka are running before proceeding to next steps, otherwise you will get 'Broker may not be available' error.
Open a terminal, run
$ /usr/local/bin/zookeeper-server-start /usr/local/etc/zookeeper/zoo.cfg
Open another terminal, run
$ /usr/local/bin/kafka-server-start /usr/local/etc/kafka/server.properties
2. Topics 1) Create new topic
Default is one partition. If you want to modify, edit config/server.properties or config/kraft/server.properties to num.partitions=3.
$ kafka-topics --bootstrap-server localhost:9092 --topic first_topic --create
文章图片
2) Create new topic specifying number of partitions
$ kafka-topics --bootstrap-server localhost:9092 --topic second_topic --create --partitions 3
3) Create new topic specifying number of partitions and replication factor
$ kafka-topics --bootstrap-server localhost:9092 --topic third_topic --create --partitions 3 --replication-factor 2
Note: I am getting an error because I only have one broker, so can't set the replication factor to >1. If we have a cluster (multiple brokers), we can set the replication factor to >1.
文章图片
4) List current topics
$ kafka-topics --bootstrap-server localhost:9092 --list
文章图片
5) Describe topics
$ kafka-topics --bootstrap-server localhost:9092 --topic first_topic --describe
文章图片
6) Delete topics
$ kafka-topics --bootstrap-server localhost:9092 --topic first_topic --delete
文章图片
3. Producers 1) Producing to existing topic
Enter producer console
$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic
【Kafka Series 3 - CLI (Command Line Interface)】Send any message.
Press ctrl+c to exit (for Mac terminal press ctrl+.)
文章图片
2) Producing to non existing topic
Kafka will create the topic automatically for you. Not recommended.
文章图片
3) Producing with keys
$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic --property parse.key=true --property key.separator=:
文章图片
If your message doesn't have a key, you will get error.
文章图片
4. Consumer 1) Consume what is being producing
Start a consumer
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic
Nothing is returning for now
文章图片
Now open a new terminal, start a producer, and send message
$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic
文章图片
You will see the message appear in the consumer terminal
文章图片
2) Consume history
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic --from-beginning
文章图片
If you have the producer running, the new message will also show.
文章图片
文章图片
Note: Looks like the messages are not in the order of they being sent. It's because we set 3 partitions in the topic, messages are randomly sent to each partition, and they are only in order within each topic.
To display detailed message information
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=https://www.it610.com/article/true --from-beginning
文章图片
3) Consumer Group
Start one consumer, specifying group to my-first-application
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
Start another consumer in a new terminal, specifying to the same group:
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
Start a producer and send some messages
$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic
You can see the consumers are splitting the messages
Producer:
文章图片
Consumer1:
文章图片
Consumer2:
文章图片
If you create a new consumer group my-second-application, then both consumer groups will receive the same messages.
list consumer groups
$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
describe one specific group
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-second-application
文章图片
If you have a producer producing message, but no consumers consuming the message, there will be difference between CURRENT-OFFSET and LOG-END-OFFSET, which is the 'LAG'. After the consumer being created and starting to consume, the LAG will be removed to 0.
If you retrive history without specifying consumer group, it will create a temporary group for you
文章图片
文章图片
文章图片
override the group.id for kafka-console-consumer using
--group mygroup
5. Offset
Reset the offsets to the beginning of each partition
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic
shift offsets by 2 (forward) as another strategy (read 2 more messages from before)
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by 2 --execute --topic first_topic
shift offsets by 2 (backward) as another strategy
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic
6.Conduktor UI demo
文章图片