本文概述
- Spring Integration
- Redis
- Redis与Spring Integration
- 总结
这些微服务之间的通信对于拥有稳定且可扩展的系统很重要。有多种方法可以做到这一点。基于消息的通信是可靠地执行此操作的一种方法。
使用消息传递时, 组件通过异步交换消息相互交互。消息通过渠道交换。
文章图片
当服务A要与服务B进行通信时, 不是直接发送它, 而是将它发送到特定的通道。当服务B要读取消息时, 它会从特定的消息通道中提取消息。
在本Spring Integration教程中, 你将学习如何使用Redis在Spring应用程序中实现消息传递。你将看到一个示例应用程序, 其中一个服务正在推送队列中的事件, 而另一服务正在逐个处理这些事件。
Spring Integration Spring Integration项目扩展了Spring框架, 以支持基于Spring的应用程序之间或内部的消息传递。组件通过消息传递范例连接在一起。各个组件可能不知道应用程序中的其他组件。
Spring Integration提供了多种与外部系统进行通信的机制。通道适配器是一种用于单向集成(发送或接收)的机制。网关用于请求/答复方案(入站或出站)。
Apache Camel是广泛使用的替代方法。在现有的基于Spring的服务中, 通常首选Spring集成, 因为它是Spring生态系统的一部分。
Redis Redis是一个非常快的内存数据存储。它也可以选择持久化到磁盘上。它支持不同的数据结构, 例如简单的键值对, 集合, 队列等。
将Redis用作队列可以使组件之间的数据共享和水平缩放变得更加容易。一个生产者或多个生产者可以将数据推送到队列, 而一个消费者或多个消费者可以拉出数据并处理事件。
多个使用者不能使用同一事件-这确保一个事件被处理一次。
文章图片
使用Redis作为消息队列的好处:
- 以非阻塞方式并行执行离散任务
- 很棒的演出
- 稳定性
- 易于监控和调试
- 易于实施和使用
- 将任务添加到队列应该比处理任务本身更快。
- 消费任务应该比生产任务快(如果没有, 则增加更多的消费者)。
假设你有一个允许用户发布帖子的应用程序。你想构建一个关注功能。另一个要求是, 每次有人发布帖子时, 都应该通过某种交流渠道(例如, 电子邮件或推送通知)通知所有关注者。
一种实现方法是在用户发布内容后向每个关注者发送电子邮件。但是, 当用户有1000个关注者时会发生什么?当有1000位用户在10秒内发布内容时, 每个人都有1000位关注者?另外, 发布者的帖子会等到所有电子邮件都发送完之后吗?
分布式系统解决了此问题。
通过使用队列可以解决此特定问题。负责发布帖子的服务A(生产者)将做到这一点。它将发布帖子并推送事件, 其中包含需要接收电子邮件的用户列表以及帖子本身。可以在服务B中获取用户列表, 但是为了简化本示例, 我们将从服务A发送用户列表。
这是一个异步操作。这意味着正在发布的服务将不必等待发送电子邮件。
服务B(使用者)将从队列中提取事件并进行处理。这样, 我们可以轻松地扩展我们的服务, 并且可以让n个消费者发送电子邮件(处理事件)。
因此, 让我们从生产者服务中的实现开始。必需的依赖项是:
<
dependency>
<
groupId>
redis.clients<
/groupId>
<
artifactId>
jedis<
/artifactId>
<
/dependency>
<
dependency>
<
groupId>
org.springframework.data<
/groupId>
<
artifactId>
spring-data-redis<
/artifactId>
<
/dependency>
<
dependency>
<
groupId>
org.springframework.integration<
/groupId>
<
artifactId>
spring-integration-redis<
/artifactId>
<
/dependency>
这三个Maven依赖项是必需的:
- Jedis是Redis的客户。
- Spring Data Redis依赖性使在Java中使用Redis更加容易。它提供了熟悉的Spring概念, 例如用于核心API使用的模板类和轻量级存储库样式的数据访问。
- Spring Integration Redis提供了Spring编程模型的扩展, 以支持著名的Enterprise Integration Patterns。
@Configuration
public class RedisConfig {@Value("${redis.host}")
private String redisHost;
@Value("${redis.port:6379}")
private int redisPort;
@Bean
public JedisPoolConfig poolConfig() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(128);
return poolConfig;
}@Bean
public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) {
final JedisConnectionFactory connectionFactory = new JedisConnectionFactory();
connectionFactory.setHostName(redisHost);
connectionFactory.setPort(redisPort);
connectionFactory.setPoolConfig(poolConfig);
connectionFactory.setUsePool(true);
return connectionFactory;
}
}
注释@Value表示Spring将在应用程序属性中定义的值注入到字段中。这意味着应在应用程序属性中定义redis.host和redis.port值。
现在, 我们需要定义要发送到队列的消息。一个简单的示例消息如下所示:
@Getter
@Setter
@Builder
public class PostPublishedEvent {private String postUrl;
private String postTitle;
private List<
String>
emails;
}
注意:Project Lombok(https://projectlombok.org/)提供了@ Getter, @ Setter, @ Builder和许多其他注释, 以避免使用getter, setter和其他琐碎的东西使代码混乱。你可以从此srcmini文章中了解更多信息。
消息本身将以JSON格式保存在队列中。每次将事件发布到队列时, 消息都会序列化为JSON。当从队列中消费时, 该消息将被反序列化。
定义完消息后, 我们需要定义队列本身。在Spring Integration中, 可以通过.xml配置轻松完成此操作。该配置应放在resources / WEB-INF目录中。
<
?xml version="1.0" encoding="UTF-8"?>
<
beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-redis="http://www.springframework.org/schema/integration/redis"
xsi:schemaLocation="http://www.springframework.org/schema/integration/redis
http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<
int-redis:queue-outbound-channel-adapter
id="event-outbound-channel-adapter"
channel="eventChannelJson"
serializer="serializer"
auto-startup="true" connection-factory="redisConnectionFactory"
queue="my-event-queue" />
<
int:gateway id="eventChannelGateway"
service-interface="org.srcmini.queue.RedisChannelGateway"
error-channel="errorChannel" default-request-channel="eventChannel">
<
int:default-header name="topic" value="http://www.srcmini.com/queue"/>
<
/int:gateway>
<
int:channel id="eventChannelJson"/>
<
int:channel id="eventChannel"/>
<
bean id="serializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
<
int:object-to-json-transformer input-channel="eventChannel"
output-channel="eventChannelJson"/>
<
/beans>
在配置中, 你可以看到” int-redis:queue-outbound-channel-adapter” 部分。其属性是:
- id:组件的bean名称。
- channel:此终结点从其接收消息的MessageChannel。
- connection-factory:对RedisConnectionFactory bean的引用。
- queue:Redis列表的名称, 在该列表上执行基于队列的推送操作以发送Redis消息。此属性与队列表达式互斥。
- queue-expression:一个SpEL表达式, 用于在运行时使用传入消息作为#root变量来确定Redis列表的名称。此属性与队列互斥。
- serializer:RedisSerializer bean引用。默认情况下, 它是一个JdkSerializationRedisSerializer。但是, 对于String有效负载, 如果未提供序列化程序引用, 则使用StringRedisSerializer。
- extract-payload:指定此端点是否应仅将负载发送到Redis队列或整个消息。其默认值为true。
- left-push:指定此端点应使用向左推送(为true时)还是向右推送(为false时)将消息写入Redis列表。如果为true, 则与默认Redis队列入站通道适配器一起使用时, Redis列表将充当FIFO队列。设置为false可与从列表中读取并弹出左键的软件一起使用, 或实现类似堆栈的消息顺序。其默认值为true。
StringRedisSerializer用于在Redis中保存之前对消息进行序列化。同样在.xml配置中, 我们定义了网关并将RedisChannelGateway设置为网关服务。这意味着可以将RedisChannelGateway Bean注入其他Bean中。我们定义了属性default-request-channel, 因为还可以通过使用@Gateway批注提供按方法的频道引用。类定义:
public interface RedisChannelGateway {
void enqueue(PostPublishedEvent event);
}
要将配置连接到我们的应用程序中, 我们必须将其导入。这是在SpringIntegrationConfig类中实现的。
@ImportResource("classpath:WEB-INF/event-queue-config.xml")
@AutoConfigureAfter(RedisConfig.class)
@Configuration
public class SpringIntegrationConfig {
}
@ImportResource批注用于将Spring .xml配置文件导入@Configuration。 @AutoConfigureAfter注释用于提示应在其他指定的自动配置类之后应用自动配置。
现在, 我们将创建一个服务并实现将事件加入Redis队列的方法。
public interface QueueService {void enqueue(PostPublishedEvent event);
}
@Service
public class RedisQueueService implements QueueService {private RedisChannelGateway channelGateway;
@Autowired
public RedisQueueService(RedisChannelGateway channelGateway) {
this.channelGateway = channelGateway;
}@Override
public void enqueue(PostPublishedEvent event) {
channelGateway.enqueue(event);
}
}
现在, 你可以使用QueueService中的enqueue方法轻松地将消息发送到队列。
Redis队列只是具有一个或多个生产者和消费者的列表。要将消息发布到队列, 生产者可以使用LPUSH Redis命令。并且, 如果你监视Redis(提示:输入redis-cli monitor), 则可以看到消息已添加到队列中:
"LPUSH" "my-event-queue" "{\"postUrl\":\"test\", \"postTitle\":\"test\", \"emails\":[\"test\"]}"
现在, 我们需要创建一个使用者应用程序, 它将从队列中提取这些事件并进行处理。消费者服务需要与生产者服务相同的依赖关系。
现在, 我们可以重用PostPublishedEvent类来反序列化消息。
我们需要创建队列配置, 同样, 它必须放置在resources / WEB-INF目录中。队列配置的内容是:
<
?xml version="1.0" encoding="UTF-8"?>
<
beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-redis="http://www.springframework.org/schema/integration/redis"
xsi:schemaLocation="http://www.springframework.org/schema/integration/redis
http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<
int-redis:queue-inbound-channel-adapter id="event-inbound-channel-adapter"
channel="eventChannelJson" queue="my-event-queue"
serializer="serializer" auto-startup="true"
connection-factory="redisConnectionFactory"/>
<
int:channel id="eventChannelJson"/>
<
int:channel id="eventChannel">
<
int:queue/>
<
/int:channel>
<
bean id="serializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
<
int:json-to-object-transformer input-channel="eventChannelJson"
output-channel="eventChannel"
type="com.srcmini.integration.spring.model.PostPublishedEvent"/>
<
int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService"
method="process">
<
int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/>
<
/int:service-activator>
<
/beans>
在.xml配置中, int-redis:queue-inbound-channel-adapter可以具有以下属性:
- id:组件的bean名称。
- channel:我们从此端点向其发送消息的MessageChannel。
- 自动启动:SmartLifecycle属性, 用于指定此端点是否应在应用程序上下文启动后自动启动。其默认值为true。
- 阶段:SmartLifecycle属性, 用于指定将启动此端点的阶段。其默认值为0。
- connection-factory:对RedisConnectionFactory bean的引用。
- queue:Redis列表的名称, 在该列表上执行基于队列的弹出操作以获取Redis消息。
- 错误通道:我们将从端点的侦听任务向其发送带有异常的ErrorMessages的MessageChannel。默认情况下, 基础MessagePublishingErrorHandler使用应用程序上下文中的默认errorChannel。
- serializer:RedisSerializer Bean参考。它可以是一个空字符串, 表示没有序列化器。在这种情况下, 来自入站Redis消息的原始byte []将作为消息有效负载发送到通道。默认情况下, 它是一个JdkSerializationRedisSerializer。
- receive-timeout:弹出操作等待队列中的Redis消息的超时时间(以毫秒为单位)。其默认值为1秒。
- recovery-interval:以毫秒为单位的时间, 在pop操作发生异常之后, 侦听器任务应在重新启动侦听器任务之前进入休眠状态。
- 期望消息:指定此端点是否希望Redis队列中的数据包含整个消息。如果将此属性设置为true, 则序列化器不能为空字符串, 因为消息需要某种形式的反序列化(默认情况下为JDK序列化)。其默认值为false。
- task-executor:对Spring TaskExecutor(或标准JDK 1.5+ Executor)bean的引用。它用于基础侦听任务。默认情况下, 使用SimpleAsyncTaskExecutor。
- right-pop:指定此端点应使用向右弹出(当为true时)还是向左弹出(当为false时)从Redis列表中读取消息。如果为true, 则与默认的Redis队列出站通道适配器一起使用时, Redis列表将充当FIFO队列。设置为false可与通过右推写入列表或实现类似堆栈的消息顺序的软件一起使用。其默认值为true。
此外, json-to-object-transformer需要一个type属性才能将JSON转换为对象, 在上面设置为type =” com.srcmini.integration.spring.model.PostPublishedEvent” 。
同样, 要连接此配置, 我们将需要SpringIntegrationConfig类, 该类可以与之前相同。最后, 我们需要可以实际处理事件的服务。
public interface EventProcessingService {
void process(PostPublishedEvent event);
}@Service("RedisEventProcessingService")
public class RedisEventProcessingService implements EventProcessingService {@Override
public void process(PostPublishedEvent event) {
// TODO: Send emails here, retry strategy, etc :)
}}
运行应用程序后, 你可以在Redis中看到:
"BRPOP" "my-event-queue" "1"
总结 使用Spring Integration和Redis, 构建Spring微服务应用程序并不像往常那样令人生畏。只需少量配置和少量样板代码, 你就可以立即构建微服务架构的基础。
【微服务通信(Redis的Spring集成教程)】即使你不打算完全擦除当前的Spring项目, 也不打算在Redis的帮助下切换到新的体系结构, 通过队列获得巨大的性能改进也是非常简单的。
推荐阅读
- 使用Node.js和MongoDB轻松进行集成和端到端测试
- 构建文本分类程序(NLP教程)
- 风险与回报(了解软件容器的指南)
- 解决 Android Studio(Failed to resolve: com.android.support:appcompat-v7:29.+ 错误)
- Android Studio学习记录-第四周
- 无障碍开发(十七)之京东APP一期优化案例讲解
- Android Locale.getDefault().getCountry()为空
- all()和apply()的区别
- Android中的Prelink技术