Kafka|Java测试Kafka生产者和消费者

一、环境准备 【Kafka|Java测试Kafka生产者和消费者】请看上篇文章介绍kafka的部署与安装,安装成功之后,启动kafka。
在Gradle中引入kafka-client依赖

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.1.0'

二、生产者创建 使用topic为test
import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class SimpleProducer {public static void main(String[] args) throws Exception { String topicName = "test"; // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", "192.168.3.45:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer producer = new KafkaProducer(props); for (int i = 0; i < 10; i++) producer.send(new ProducerRecord(topicName, Integer.toString(i), Integer.toString(i))); System.out.println("Message sent successfully"); producer.close(); } }

三、消费者创建
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; public class SimpleConsumer { public static void main(String[] args) throws Exception {//Kafka consumer configuration settings String topicName = "test"; Properties props = new Properties(); props.put("bootstrap.servers", "192.168.3.45:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)); //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records)// print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = https://www.it610.com/article/%s/n", record.offset(), record.key(), record.value()); } } }

四、测试 首先启动消费者程序SimpleConsumer,程序会阻塞,接着启动生成者程序SimpleProducer,会发现消费者窗口接收到消息
Subscribed to topic test offset = 17, key = 0, value = https://www.it610.com/article/0 offset = 18, key = 1, value = 1 offset = 19, key = 2, value = 2 offset = 20, key = 3, value = 3 offset = 21, key = 4, value = 4 offset = 22, key = 5, value = 5 offset = 23, key = 6, value = 6 offset = 24, key = 7, value = 7 offset = 25, key = 8, value = 8 offset = 26, key = 9, value = 9

ps:如果消费者接收不到消息,请看Kafka服务器允许客户端远程连接这篇文章。

    推荐阅读