采用 Spring Integration Kafka实现高吞吐量的分布式发布订阅消息
采用 Spring Integration Kafka实现高吞吐量的分布式发布订阅消息【服务端配置】:Apache官网下最新的二进制版本:kafka_2.11-2.3.0(内含有zookeeper)
(1)修改config/server.properties 里面的值:
listeners=PLAINTEXT://HOST_NAME(IP):9092
advertised.listeners=PLAINTEXT://HOST_NAME(IP):9092
其他配置默认
(2) 启动zookeeper:
./bin/zookeeper-server.start.sh ../config/zookeeper.properties &
(3)启动Kafka
./bin/kafka-server-start.sh ../config/server.properties &
(4)创建主题Topic -- myTopic
.bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic myTopic
(5) 查看服务端所创建的topic有哪些
.bin/kafka-topics.sh --list --zookeeper localhost:2181
【pom.xml】jar包依赖:
org.springframework.integration
spring-integration-kafka
1.3.0.RELEASE
【生产消费(发布/订阅)配置】 :
spring-kafka-consumer.xml:
smallest10485760 52428801000
spring-kafka-producer.xml:
36000005kafka.serializer.StringEncodercom.wanrong.payment.commons.MyPartitioner-1【采用 Spring Integration Kafka实现高吞吐量的分布式发布订阅消息】52428801000020050010000org.apache.kafka.common.serialization.ByteArraySerializerorg.apache.kafka.common.serialization.ByteArraySerializer
【注意】: spring-kafka-consumer.xml的auto-startup设置为true 才能保证 消费端口接受到消息
【发消息接口 KafkaService】 :
package com.sunney.service;
/**
* 类KafkaService.java的实现描述:发消息接口类
* @author nazi
*/
public interface KafkaService {
/**
* 发消息
* @param topic 主题
* @param obj 发送内容
*/
public void sendUserInfo(String topic, Object obj);
}
对应实现类:
package com.sunney.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
import com.sunney.service.KafkaService;
/**
* 类KafkaServiceImpl.java的实现描述:发消息实现类
*
* @author nazi
*/
@Service("kafkaService")
public class KafkaServiceImpl implements KafkaService {// 默认4个分区
private static final int numPartitions = 4;
@Autowired
@Qualifier("kafkaTopicTest")
MessageChannelchannel;
public void sendUserInfo(String topic, Object obj) {
// 获取messageKey
String messageKey = String.valueOf(obj.hashCode());
// 获取分区号
int partitionId = partition(messageKey, numPartitions);
//partitionId = 0;
channel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC,
topic).setHeader(KafkaHeaders.MESSAGE_KEY,
messageKey).setHeader(KafkaHeaders.PARTITION_ID,
partitionId).build());
}/**
* 获取分区号
*
* @param key KEY
* @param numPartitions 分区数
* @return
*/
private int partition(Object key, int numPartitions) {
try {
long partitionNum = Long.parseLong((String) key);
return (int) Math.abs(partitionNum % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}}
【消息接受类】:
package com.sunney.service.impl;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.sunney.service.KafkaConsumerService;
import com.sunney.service.UserDto;
/**
* 类KafkaConsumerService.java的实现描述:消费接收类
*
* @author nazi
*/
public class KafkaConsumerServiceImpl implements KafkaConsumerService{static final Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class);
@Override
public void processMessage(Map> msgs) {
logger.info("===============================================processMessage===============");
for (Map.Entry> entry : msgs.entrySet()) {
logger.info("============Topic:" + entry.getKey());
LinkedHashMap messages = (LinkedHashMap) entry.getValue();
Set keys = messages.keySet();
for (Integer i : keys)
logger.info("======Partition:" + i);
Collection values = messages.values();
for (Iterator iterator = values.iterator();
iterator.hasNext();
) {
String message = "["+iterator.next()+"]";
logger.info("=====message:" + message);
List userList = JSON.parseArray(message, UserDto.class);
logger.info("=====userList.size:" + userList.size());
for(UserDto userDto : userList){
logger.info("call userDto:" + userDto.toString());
}}}
}}
【消费到的消息打印日志】:
![采用 Spring Integration Kafka实现高吞吐量的分布式发布订阅消息](https://img.it610.com/image/info8/f5cce666ae49448da6ceea860af89b2f.jpg)
文章图片
项目发布地址: https://github.com/JnCaoxia/kafaDemo
推荐阅读
- Activiti(一)SpringBoot2集成Activiti6
- SpringBoot调用公共模块的自定义注解失效的解决
- 解决SpringBoot引用别的模块无法注入的问题
- 2018-07-09|2018-07-09 Spring 的DBCP,c3p0
- spring|spring boot项目启动websocket
- Spring|Spring Boot 整合 Activiti6.0.0
- Spring集成|Spring集成 Mina
- springboot使用redis缓存
- Spring|Spring 框架之 AOP 原理剖析已经出炉!!!预定的童鞋可以识别下发二维码去看了
- Spring|Spring Boot之ImportSelector