解决SpringBoot整合RocketMQ遇到的坑

应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换。
引入依赖

org.apache.rocketmqrocketmq-spring-boot-starter2.0.4

消费者代码
@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")public class Consumer implements RocketMQListener {@Overridepublic void onMessage(String s) {System.out.println("消费到的数据为:"+s); }}

问题排查 【解决SpringBoot整合RocketMQ遇到的坑】RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。
@Overridepublic void afterSingletonsInstantiated() {// 获取所有所有使用了RocketMQMessageListener注解的beanMap beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) {// 循环注册容器beans.forEach(this::registerContainer); }}private void registerContainer(String beanName, Object bean) {Class clazz = AopProxyUtils.ultimateTargetClass(bean); // 校验当前bean是否实现了RocketMQListener接口if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); }// 获取bean上的annotationRocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); // 解析group及topic,可支持表达式String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled =(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP).getOrDefault(topic, true); if (!listenerEnabled) {log.debug("Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",consumerGroup, topic); return; }validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext; // 注册bean的,调用createRocketMQListenerContainergenericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,() -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class); if (!container.isRunning()) {try {container.start(); } catch (Exception e) {log.error("Started container failed. {}", container, e); throw new RuntimeException(e); }}log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); }private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,RocketMQMessageListener annotation) {DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) {container.setAccessChannel(AccessChannel.valueOf(accessChannel)); }container.setTopic(environment.resolvePlaceholders(annotation.topic())); // 此处已经根据表达式将数据取出String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (!StringUtils.isEmpty(tags)) {container.setSelectorExpression(tags); }container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); // 此处将SelectorExpression的数据覆盖成了表达式container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener)bean); container.setObjectMapper(objectMapper); container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); container.setName(name); // REVIEW ME, use the same clientId or multiple?return container; }

问题解决 因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去。
/** * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据**/@Configurationpublic class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {@Autowiredprivate ApplicationContext applicationContext; @Autowiredprivate StandardEnvironment environment; @Overridepublic void afterPropertiesSet() throws Exception {Map beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); for (Object bean : beans.values()){Class clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {continue; }RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation); Field field = invocationHandler.getClass().getDeclaredField("memberValues"); field.setAccessible(true); Map memberValues = (Map) field.get(invocationHandler); for (Map.Entry entry: memberValues.entrySet()) {if(Objects.nonNull(entry)){memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue()))); }}}}}

初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

    推荐阅读