知识为进步之母,而进步又为富强之源泉。这篇文章主要讲述微服务架构 | 8.1 使用 Spring Cloud Stream 整合 Apache kafka #yyds干货盘点#相关的知识,希望能为你提供帮助。
@[TOC](8.1 使用 Spring Cloud Stream 整合 Apache kafka)
前言参考资料:
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服务原理与实战》
《B站 尚硅谷 SpringCloud 框架开发教程 周阳》
Spring Cloud Stream是一个由注解驱动的框架,它允许开发人员在 Spring 应用程序中轻松地构建消息发布者和消费者;
Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决;
1. Spring Cloud Stream 基础知识
1.1 消息相关的四个组件
- 发射器 Source: 发射器序列化消息(默认为 JSON),并将消息发布到通道;
- 最终调用
source.output().send(MessageBuilder.withPayload(msg).build())
方法发送消息;
- 最终调用
- 通道 Channel:通道是对队列的一个抽象,它将在消息生产者发布消息或消息消费者消费消息后保留该消息。通道位于消息队列之上;
- 绑定器 Binder:其允许开发人员处理消息,而不必依赖于特定于平台的库和 API 来发布和消费消息;
- 接收器 Sink:接收器监昕传入消息的通道, 并将消息反序列化为 POJO;
文章图片
1.2 本例说明
- 在本篇的代码示例中,消息生产者称为组织服务,消息消费者称为许可证服务;
- 可以理解成组织服务需要对许可证进行 CRUD 操作,需要发送消息通知许可证服务,许可证服务监听消息,做出反应;
- 在本例中:
- 消息发送方 = 生产者 = output = Source;
- 消息接收方 = 消费者 = input = Sink;
文章图片
- 示例说明:每当添加、更新或删除记录时,组织服务就将向 orgChange Topic发布消息,并且许可证服务从同一主题接收消息;
<
dependency>
<
groupid>
org.springframework.cloud<
/groupid>
<
artifactid>
spring-cloud-stream<
/artifactid>
<
/dependency>
<
dependency >
<
groupid>
org.springframework.cloud<
/groupid>
<
artifactid>
spring-cloud-starter-stream-kafka<
/artifactid>
<
/dependency>
2.2 修改 yaml 配置文件
spring:
cloud:
#所需配置的开始
stream:
bindings:
#output 是通道的名称,映射到 source.output() 通道
output:
#写入消息的消息队列的名称,消息发送的目的地
destination: orgChangeTopic
#将要发送和接收消息的类型提示(还可以是xml、Avro),发送方要指定格式
content-type: application/json
#使用 kafka 作为消息总线,可以替换成 RabbitMQ
kafka:
binder:
#Kafka 消息代理的网络位置
zkNodes: localhost
#运行着 Kafka 的 Apache ZooKeeper 服务器的网络位置
brokers: localhost
2.3 主程序类上添加注解@EnableBinding(Source.class):告诉 Spring Cloud Stream,该服务将通过在 Source 类上定义的一组通道与消息代理进行通信;(也可以不添加在主程序类上)
2.4 编写消息生产者
@Component
public class SimpleSourceBean
private Source source;
private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);
//注入一个 Source 接口,以供服务使用。该接口表示通道
@Autowired
public SimpleSourceBean(Source source)
this.source = source;
public void publishOrgChange(String action,String orgId)
logger.debug("Sending Kafka messagefor Organization Id: ", action, orgId);
OrganizationChangeModel change =new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action,
orgId,
UserContext.getCorrelationId());
//要发布的消息是一个 java POJO//当准备发送消息时,使用 Source 类中定义的通道的 send() 方法
source.output().send(MessageBuilder.withPayload(change).build());
output()
方法返回一个 MessageChannel 类,代表如何将消息发送给消息代理;- 消息的实际发布发生在
publishOrgChange()
方法中。此方法构建一个名为 OrganizationChangeModel 的 Java POJO。其有 3 个数据元素:- action:触发事件的动作;
- orgId:与事件关联的组织 ID;
- correlationId:触发事件的服务调用的关联 ID(用于跟踪和调试流经服务的消息流);
send()
方法接收一个 Spring Message 类;可以们使用一个名为 MessageBuilder 的 Spring 辅助类来接收 OrganizationChangeModel 类的内容,并将它转换为 Spring Message 类;
@Service
public class OrganizationService
@Autowired
private OrganizationRepository orgRepository;
//自动装配上述写的 SimpleSourceBean 类
@Autowired
SimpleSourceBean simpleSourceBean;
public Organization getOrg(String organizationId)
return orgRepository.findById(organizationId);
public void saveOrg(Organization org)
org.setId( UUID.randomUUID().toString());
orgRepository.save(org);
//对服务中修改数据的每一个方法,调用 simpleSourceBean.publishOrgChange(),下面同理
simpleSourceBean.publishOrgChange("SAVE", org.getId());
public void updateOrg(Organization org)
orgRepository.save(org);
simpleSourceBean.publishOrgChange("UPDATE", org.getId());
public void deleteOrg(String orgId)
orgRepository.delete( orgId );
simpleSourceBean.publishOrgChange("DELETE", orgId);
- 对于修改组织数据的方法,需要放进消息通道里;
文章图片
3.1 添加 pom.xml 依赖
<
dependency>
<
groupid>
org.springframework.cloud<
/groupid>
<
artifactid>
spring-cloud-stream<
/artifactid>
<
/dependency>
<
dependency >
<
groupid>
org.springframework.cloud<
/groupid>
<
artifactid>
spring-cloud-starter-stream-kafka<
/artifactid>
<
/dependency>
3.2 修改 yaml 配置文件
spring:
cloud:
stream:
bindings:
input:
#将 input 属性映射到 orgChangeTopic 队列中
destination: orgChangeTopic
content-type: application/json
#保证同一个消费者组收到相同的消息,定义将要消费消息的消费者的名称
group: licensingGroup
binder:
zkNodes: localhost
brokers: localhost
input
属性值 orgChangeTopic,将映射到下面代码的 Sink.INPUT 通道;
3.4 编写消息消费者
//每次收到 input 通道的消息时,Spring Cloud Stream 将执行 loggerSink() 方法
@StreamListener(Sink.INPUT)
public void loggerSink(OrganizationChangeModel orgChange)
logger.debug("Received an event for organization id ", orgChange.getOrganizationId());
4. 分布式缓存示例(主要实现的是消费者的逻辑)
- 示例说明:
- 许可证服务(消费者)始终检查分布式的 Redis 缓存以获取与特定许可证相关联的组织数据。 如果组织数据在缓存中存在,那么将从缓存中返回数据。否则,将调用组织服务(生产者),并将调用的结果缓存在一个 Redis 散列中;
- 在组织服务(生产者)中更新数据时,组织服务(生产者)将向 Kafka 发出一条消息。许可证服务(消费者)将接收消息,并对 Redis 发出删除指令,以清除缓存;
<
!--Spring Data Redis dependencies-->
<
dependency>
<
groupId>
org.springframework.data<
/groupId>
<
artifactId>
spring-data-redis<
/artifactId>
<
version>
1.7.4.RELEASE<
/version>
<
/dependency>
<
dependency>
<
groupId>
redis.clients<
/groupId>
<
artifactId>
jedis<
/artifactId>
<
version>
2.9.0<
/version>
<
/dependency>
<
dependency>
<
groupId>
org.apache.commons<
/groupId>
<
artifactId>
commons-pool2<
/artifactId>
<
version>
2.0<
/version>
<
/dependency>
4.2 在主程序类中公开 JedisConnectionFactory 作为 Spring bean
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker //断路器
@EnableBinding(Sink.class)
public class Application @Autowired
private ServiceConfig serviceConfig;
...//该方法设置到 Redis 服务器的实际数据库连接
@Bean
public JedisConnectionFactory jedisConnectionFactory()
JedisConnectionFactory jedisConnFactory = new JedisConnectionFactory();
jedisConnFactory.setHostName( serviceConfig.getRedisServer());
jedisConnFactory.setPort( serviceConfig.getRedisPort() );
return jedisConnFactory;
//该方法创建一个 RedisTemplate,用于对 Redis 服务器执行操作
@Bean
public RedisTemplate<
String, Object>
redisTemplate()
RedisTemplate<
String, Object>
template = new RedisTemplate<
String, Object>
();
template.setConnectionFactory(jedisConnectionFactory());
return template;
public static void main(String[] args)
SpringApplication.run(Application.class, args);
4.3 定义访问 Redis 的接口
public interface OrganizationRedisRepository
void saveOrganization(Organization org);
void updateOrganization(Organization org);
void deleteOrganization(String organizationId);
Organization findOrganization(String organizationId);
- 该接口将被注入到任何需要访问 Redis 的服务中;
@Repository//表示该类是一个与 Spring Data 一起使用的存储库类
public class OrganizationRedisRepositoryImpl implements OrganizationRedisRepository
//在 Redis 服务器中存储组织数据的散列的名称
private static final String HASH_NAME ="organization";
private RedisTemplate<
String, Organization>
redisTemplate;
//该类包含一组用于在 Redis 服务器上执行数据操作的辅助方法
private HashOperations hashOperations;
public OrganizationRedisRepositoryImpl()
super();
@Autowired
private OrganizationRedisRepositoryImpl(RedisTemplate redisTemplate)
this.redisTemplate = redisTemplate;
@PostConstruct
private void init()
hashOperations = redisTemplate.opsForHash();
//与 Redis 的所有交互都将使用由键存储的单个 Organization 对象
@Override
public void saveOrganization(Organization org)
hashOperations.put(HASH_NAME, org.getId(), org);
@Override
public void updateOrganization(Organization org)
hashOperations.put(HASH_NAME, org.getId(), org);
@Override
public void deleteOrganization(String organizationId)
hashOperations.delete(HASH_NAME, organizationId);
@Override
public Organization findOrganization(String organizationId)
return (Organization) hashOperations.get(HASH_NAME, organizationId);
- 使用上述 4.2 中定义的 RedisTemplate 与 Redis 服务器交互,并对其操作;
@Component
public class OrganizationRestTemplateClient
@Autowired
RestTemplate restTemplate;
//消费者需要访问 Redis,因此需要注入 OrganizationRedisRepository 接口
@Autowired
OrganizationRedisRepository orgRedisRepo;
private static final Logger logger = LoggerFactory.getLogger(OrganizationRestTemplateClient.class);
private Organization checkRedisCache(String organizationId)
try
//尝试使用组织 ID 从 Redis 中检索 Organization 类
return orgRedisRepo.findOrganization(organizationId);
catch (Exception ex)
logger.error("Error encountered while trying to retrieve organizationcheck Redis Cache.Exception ", organizationId, ex);
return null;
private void cacheOrganizationObject(Organization org)
try
orgRedisRepo.saveOrganization(org);
catch (Exception ex)
logger.error("Unable to cache organizationin Redis. Exception ", org.getId(), ex);
public Organization getOrganization(String organizationId)
logger.debug("In Licensing Service.getOrganization: ", UserContext.getCorrelationId());
Organization org = checkRedisCache(organizationId);
//如果无法从 Redis 中检索出数据,将调用组织服务从源数据库检索数据
if (org!=null)
logger.debug("I have successfully retrieved an organizationfrom the redis cache: ", organizationId, org);
return org;
logger.debug("Unable to locate organization from the redis cache: .", organizationId);
ResponseEntity<
Organization>
restExchange =
restTemplate.exchange(
"http://zuulservice/api/organization/v1/organizations/organizationId",
HttpMethod.GET,
null, Organization.class, organizationId);
/* 将记录保存到缓存中 */
org = restExchange.getBody();
if (org!=null)
//将检索到的对象保存到缓存中
cacheOrganizationObject(org);
return org;
- 在与缓存交互时需要注意异常的处理;
@EnableBinding(Sink.class)//使用默认
public class OrganizationChangeHandler @Autowired
private OrganizationRedisRepository organizationRedisRepository;
private static final Logger logger = LoggerFactory.getLogger(OrganizationChangeHandler.class);
@StreamListener(Sink.INPUT)//使用默认
public void loggerSink(OrganizationChangeModel orgChange)
logger.debug("Received a message of type " + orgChange.getType());
//消费者收到消息后,检查与数据相关的操作,然后做出相应反应
switch(orgChange.getAction())
case "GET":
logger.debug("Received a GET event from the organization service for organization id ", orgChange.getOrganizationId());
break;
case "SAVE":
logger.debug("Received a SAVE event from the organization service for organization id ", orgChange.getOrganizationId());
break;
case "UPDATE":
logger.debug("Received a UPDATE event from the organization service for organization id ", orgChange.getOrganizationId());
//如果组织数据被更新或删除,就通过 OrganizationRedisRepository 类从 Redis 中删除组织数据
organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
break;
case "DELETE":
logger.debug("Received a DELETE event from the organization service for organization id ", orgChange.getOrganizationId());
organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId());
break;
default:
logger.error("Received an UNKNOWN event from the organization service of type ", orgChange.getType());
break;
5. [可选] 自定义通道(以消息消费者为例) 5.1 定义接口
public interface CustomChannels
@Input("inboundOrgChanges")//方法级注解,定义通道名称
SubscribableChannel orgs();
//通过 @Input 注解公开的每个通道必须返回一个 SubscribableChannel 类
- 同理,消息生产者要定义与实现的接口如下:
@OutputChannel("outboundOrg")
MessageChannel outboundOrg();
5.2 修改 yaml 配置文件
spring:
cloud:
stream:
bindings:
#将 input 改为 inboundOrgChanges
inboundOrgChanges:
destination: orgChangeTopic
content-type: application/json
group: licensingGroup
kafka:
binder:
zkNodes: localhost
brokers: localhost
5.3 注入自定义的接口
@EnableBinding(CustomChannels.class)//使用 CustomChannels 作为参数注入而不是 Sink
public class OrganizationChangeHandler ...//@StreamListener 注解传入通道名称而不是 Sink.INPUT
@StreamListener("inboundOrgChanges")
public void loggerSink(OrganizationChangeModel orgChange)
...
最后::: hljs-center
新人制作,如有错误,欢迎指出,感激不尽!
:::
::: hljs-center
欢迎关注公众号,会分享一些更日常的东西!
:::
::: hljs-center
如需转载,请标注出处!
:::
::: hljs-center
文章图片
【微服务架构 | 8.1 使用 Spring Cloud Stream 整合 Apache kafka #yyds干货盘点#】:::
推荐阅读
- 微服务架构 | 6.1 使用 Zuul 进行服务路由 #yyds干货盘点#
- Java之集合
- 预处理器
- 如何利用MHA+ProxySQL实现读写分离和负载均衡
- Flink处理函数实战之四(窗口处理)
- Java多线程与线程池技术
- MapReduce编程模型和计算框架
- 三高Mysql - Mysql索引和查询优化(偏实战部分)
- 浅析$nextTick和$forceUpdate