公司在做一些金融相关业务,某些时候由于数据提供商定期维护或者特殊原因需要暂停某些服务的消费者。之前选用的消息队列技术栈是RabbitMQ,用于微服务之间的消息投递,对于这类需要暂停消费者的场景是选用注释掉消费者Bean中的相应Spring(Boot)注解重新发布来实现,后面需要重新启动消费就是解开对应的注释再发布一次。这样的处理流程既繁琐,也显得没有技术含量,所以笔者就这个问题结合已有的配置中心Nacos集群做了一个方案,使用Nacos的配置准实时刷新功能去控制某个微服务实例的所有RabbitMQ消费者(容器)的停止和启动。,,spring-boot-rabbit-nacos-control-1,下面探讨一下方案的原理和可行性,主要包括:,因为工作中的主要技术栈是SpringBoot + RabbitMQ,下文是探讨场景针对spring-boot-starter-amqp(下面简称amqp)展开。,使用SpringBoot版本为2.3.0.RELEASE,spring-cloud-alibaba-nacos-config的版本为2.2.0.RELEASE,查看RabbitAnnotationDrivenConfiguration的源码:,spring-boot-rabbit-nacos-control-2,amqp中默认启用spring.rabbitmq.listener.type=simple,使用的RabbitListenerContainerFactory(消息监听器容器工厂)实现为SimpleRabbitListenerContainerFactory,使用的MessageListenerContainer(消息监听器容器)实现为SimpleMessageListenerContainer。在amqp中,无论注解声明式或者编程式注册的消费者最终都会封装为MessageListenerContainer实例,因此消费者生命周期可以直接通过MessageListenerContainer进行管理,MessageListenerContainer的生命周期管理API会直接作用于最底层的真实消费者实现BlockingQueueConsumer。几者的关系如下:,
,spring-boot-rabbit-nacos-control-3,一般声明式消费者注册方式如下:,对于基于@RabbitListener进行声明式注册的消费者,每个被@RabbitListener修饰的Bean或者方法最终都会单独生成一个SimpleMessageListenerContainer实例,这些SimpleMessageListenerContainer实例的唯一标识由@RabbitListener的id属性指定,缺省值为org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N,建议在使用时候通过规范约束必须定义此id属性。分析源码可以得知这类型的消费者通过RabbitListenerAnnotationBeanPostProcessor进行发现和自动注册,并且在RabbitListenerEndpointRegistry缓存了注册信息,因此可以通过RabbitListenerEndpointRegistry直接获取这些声明式的消费者容器实例:,一般编程式消费者注册方式如下:,编程式注册的SimpleMessageListenerContainer可以直接从IOC容器中获取:,至此,我们知道可以比较轻松地拿到服务中所有的MessageListenerContainer的实例,从而可以管理服务内所有消费者的生命周期。,Nacos的客户端通过LongPolling(长轮询)的方式监听Nacos服务端集群对应dataId和group的配置数据变更,具体可以参考ClientWorker的源码实现,实现的过程大致如下:,
,spring-boot-rabbit-nacos-control-4,在非Spring(Boot)体系中,可以通过ConfigService#addListener()进行配置变更监听,示例代码如下:,这种LongPolling的方式目前来看可靠性是比较高,因为Nacos服务端集群一般在生产部署是大于3的奇数个实例节点,并且底层基于raft共识算法实现集群通讯,只要不是同一时间超过半数节点宕机集群还是能正常提供服务。但是从实现上来看会有一些局限性:,关于配置变更监听其实有其他候选的方案,例如Redis的发布订阅,Zookeeper的节点路径变更监听甚至是使用消息队列进行通知,本文使用Nacos配置变更监听的原因是更好的划分不同应用配置文件的编辑查看权限方便进行管理,其他候选方案要实现分权限管理需要二次开发,使用SpringCloudAlibaba提供的spring-cloud-alibaba-nacos-config可以更加简便地使用Nacos配置刷新监听,并且会把变更的PropertySource重新绑定到对应的配置属性Bean。引入依赖:,具体的配置类是NacosConfigProperties:,
,spring-boot-rabbit-nacos-control-5,红圈中是需要关注的配置项,refreshEnabled是配置刷新的开关,默认是开启的。sharedConfigs和extensionConfigs虽然命名不同,但是两者实现和功能没有差异,都是类似于共享或者说扩展配置,每个共享(扩展)配置支持单独配置刷新开关。举个例子,在Nacos服务端的某个配置如下图:,
,spring-boot-rabbit-nacos-control-6,为了支持配置变更和对应的实体类成员变量更新,对应客户端的配置文件是这样的:,对应的配置属性Bean如下:,只要客户端所在SpringBoot服务启动完成后,修改Nacos服务端对应dataId为shared.properties的shared.foo属性值,那边SharedProperties的foo属性就会准实时刷新。可以在SharedProperties添加一个@PostConstruct来观察这个属性更新的过程:,整个方案实施包括下面几步:,初始化一个Maven项目,引入下面的依赖:,下载Nacos服务并且启动一个单机实例(当前2023-02的最新稳定版为2.2.0),新建命名空间LOCAL并且添加四份配置文件:,
,spring-boot-rabbit-nacos-control-7,可以使用1.x的Nacos客户端去连接2.x的Nacos服务端,这个是Nacos做的向下兼容,反过来不行,前文提到的Nacos客户端中,ConfigService是通过dataId和group定位到具体的配置文件,一般dataId按照配置文件的内容命名,对于SpringBoot的应用配置文件一般命名为application-{profile}.[properties,yml],group是配置文件的分组,对于SpringBoot的应用配置文件一般命名为{spring.application.name}。笔者在在这份SpringBoot的应用配置文件中只添加了RabbitMQ的配置:,
,spring-boot-rabbit-nacos-control-8,确保本地或者远程有一个可用的RabbitMQ服务,接下来往下开始实施方案。,前面已经提到过SpringBoot结合Nacos进行配置属性Bean的成员变量刷新,在项目的Classpath(resources文件夹)添加bootstrap.properties文件,内容如下:,这里profile定义为default也就是会关联到Nacos中dataId = ‘application.properties’, group = ‘rabbitmq-rocketmq-demo’那份配置文件,主要是用于定义amqp需要的配置属性。对于RabbitMQ消费者的开关,定义在dataId = ‘rabbitmq-toggle.properties’, group = ‘rabbitmq-rocketmq-demo’的文件中。添加RabbitmqToggleProperties:,这里prefix定义为rabbitmq.toggle,为了和rabbitmq-toggle.properties的属性一一绑定,该文件中的配置Key必须以rabbitmq.toggle为前缀。RabbitmqToggleProperties首次回调@PostConstruct方法只打印初始化日志,再次回调@PostConstruct方法则发布RabbitmqToggleRefreshEvent事件,用于后面通知对应的消费者容器Bean进行启停。,为了统一管理服务中所有消费者容器Bean,需要定义一个类似于消费者容器注册或者缓存中心类,缓存Key可以考虑使用listenerId,Value就直接使用MessageListenerContainer实例即可:,这里既然选定了listenerId作为缓存的Key,那么必须定义好规范,要求无论注解声明式定义的消费者还是编程式定义的消费者,必须明确指定具体意义的listenerId,否则到时候存在Key的格式为org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#N会比较混乱,接下来发现和缓存所有消费者容器:,StaticEventPublisher中的ApplicationEventPublisher属性延迟到所有消费者容器缓存完成后赋值,防止过早的属性变更通知导致部分消费者容器的启停操作被忽略。,接收到RabbitmqToggleRefreshEvent事件后,然后遍历传递过来的RabbitmqToggleProperties里面的consumers,再基于已经发现的消费者容器进行处理,代码大概如下:,修改Nacos服务里面的rabbitmq-toggle.properties文件,输入内容如下:,启动项目,观察RabbitMQ WebUI对应的队列消费者数量:,
,spring-boot-rabbit-nacos-control-9,然后随机修改rabbitmq-toggle.properties文件某个消费者容器设置为enable = ‘fasle’,观察服务日志和观察RabbitMQ WebUI的变化:,
,spring-boot-rabbit-nacos-control-10,可见RabbitMQ WebUI中队列消费者数量减少,服务日志也提示listenerId = ‘MessageListenerDemoConsumer’的消费者容器被停止了。,为了更精确控制有消费者容器的启停,可以考虑在配置文件中定义关闭消费者容器的自动启动开关:,可以考虑在RabbitmqToggleProperties首次回调@PostConstruct方法时候发布RabbitmqToggleInitEvent事件,然后监听此事件启动所有已经发现的消费者容器。这样就能做到应用内部的消费者的启停行为总是以Nacos的开关配置文件为准,并且可以实现「在线」启停和动态调整最小最大消费者数量。,另外,如果细心的话能够观察到服务日志中,每当监听到Nacos配置变动会打印Started application in N seconds (JVM running for M)的日志,这个并不是服务重启了,而是启动了一个Spring子容器用于构建一个全新的StandardEnvironment(见文末Demo项目中的EnvironmentCaptureApplicationRunner)用来承载刷新后的配置文件内容,然后再拷贝或者覆盖到当前的Spring容器中的PropertySources,这个过程的代码实现类似这样:,
,spring-boot-rabbit-nacos-control-11,本文探讨了一种通过Nacos配置刷新方式管理SpringBoot服务中RabbitMQ消费者生命周期管理的方案,目前只是提供了完整的思路和一些Demo级别代码,后续应该会完善方案和具体的工程级别编码实现。,本文Demo项目仓库:
文章版权声明
1 原创文章作者:cmcc,如若转载,请注明出处: https://www.52hwl.com/20741.html
2 温馨提示:软件侵权请联系469472785#qq.com(三天内删除相关链接)资源失效请留言反馈
3 下载提示:如遇蓝奏云无法访问,请修改lanzous(把s修改成x)
4 免责声明:本站为个人博客,所有软件信息均来自网络 修改版软件,加群广告提示为修改者自留,非本站信息,注意鉴别