采用 Spring Integration Kafka实现高吞吐量的分布式发布订阅消息

采用 Spring Integration Kafka实现高吞吐量的分布式发布订阅消息
【服务端配置】:Apache官网下最新的二进制版本:kafka_2.11-2.3.0(内含有zookeeper)
(1)修改config/server.properties 里面的值:
listeners=PLAINTEXT://HOST_NAME(IP):9092 advertised.listeners=PLAINTEXT://HOST_NAME(IP):9092

其他配置默认
(2) 启动zookeeper:
./bin/zookeeper-server.start.sh ../config/zookeeper.properties &

(3)启动Kafka
./bin/kafka-server-start.sh ../config/server.properties &

(4)创建主题Topic -- myTopic
.bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic myTopic

(5) 查看服务端所创建的topic有哪些
.bin/kafka-topics.sh --list --zookeeper localhost:2181

【pom.xml】jar包依赖:
org.springframework.integration spring-integration-kafka 1.3.0.RELEASE

【生产消费(发布/订阅)配置】 :
spring-kafka-consumer.xml:
smallest10485760 52428801000

spring-kafka-producer.xml:
36000005kafka.serializer.StringEncodercom.wanrong.payment.commons.MyPartitioner-1【采用 Spring Integration Kafka实现高吞吐量的分布式发布订阅消息】52428801000020050010000org.apache.kafka.common.serialization.ByteArraySerializerorg.apache.kafka.common.serialization.ByteArraySerializer

【注意】: spring-kafka-consumer.xml的auto-startup设置为true 才能保证 消费端口接受到消息
【发消息接口 KafkaService】 :
package com.sunney.service; /** * 类KafkaService.java的实现描述:发消息接口类 * @author nazi */ public interface KafkaService { /** * 发消息 * @param topic 主题 * @param obj 发送内容 */ public void sendUserInfo(String topic, Object obj); }

对应实现类:
package com.sunney.service.impl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.kafka.support.KafkaHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Service; import com.sunney.service.KafkaService; /** * 类KafkaServiceImpl.java的实现描述:发消息实现类 * * @author nazi */ @Service("kafkaService") public class KafkaServiceImpl implements KafkaService {// 默认4个分区 private static final int numPartitions = 4; @Autowired @Qualifier("kafkaTopicTest") MessageChannelchannel; public void sendUserInfo(String topic, Object obj) { // 获取messageKey String messageKey = String.valueOf(obj.hashCode()); // 获取分区号 int partitionId = partition(messageKey, numPartitions); //partitionId = 0; channel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC, topic).setHeader(KafkaHeaders.MESSAGE_KEY, messageKey).setHeader(KafkaHeaders.PARTITION_ID, partitionId).build()); }/** * 获取分区号 * * @param key KEY * @param numPartitions 分区数 * @return */ private int partition(Object key, int numPartitions) { try { long partitionNum = Long.parseLong((String) key); return (int) Math.abs(partitionNum % numPartitions); } catch (Exception e) { return Math.abs(key.hashCode() % numPartitions); } }}

【消息接受类】:
package com.sunney.service.impl; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.sunney.service.KafkaConsumerService; import com.sunney.service.UserDto; /** * 类KafkaConsumerService.java的实现描述:消费接收类 * * @author nazi */ public class KafkaConsumerServiceImpl implements KafkaConsumerService{static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class); @Override public void processMessage(Map> msgs) { logger.info("===============================================processMessage==============="); for (Map.Entry> entry : msgs.entrySet()) { logger.info("============Topic:" + entry.getKey()); LinkedHashMap messages = (LinkedHashMap) entry.getValue(); Set keys = messages.keySet(); for (Integer i : keys) logger.info("======Partition:" + i); Collection values = messages.values(); for (Iterator iterator = values.iterator(); iterator.hasNext(); ) { String message = "["+iterator.next()+"]"; logger.info("=====message:" + message); List userList = JSON.parseArray(message, UserDto.class); logger.info("=====userList.size:" + userList.size()); for(UserDto userDto : userList){ logger.info("call userDto:" + userDto.toString()); }}} }}

【消费到的消息打印日志】:
采用 Spring Integration Kafka实现高吞吐量的分布式发布订阅消息
文章图片

项目发布地址: https://github.com/JnCaoxia/kafaDemo




    推荐阅读