springcloud 之stream

spring cloud stream

快速入门

  • 加入依赖
<dependency>
<groupid>org.springframework.cloud</groupid>
<artifactid>spring-cloud-starter-stream-rabbit</artifactid>
</dependency>
  • 创建用于接收来自 RabbitMQ 消息的消费者 SinkReceiver
    在这里插入图片描述
    rabbitMQ默认都是基本配置,没有其他的修改,
  • @EnableBinding, 该注解用来指定一个或多个定义了@input或@Output 注解的接口,以此实现对消息通道 Channel) 的绑定
  • @EnableBinding(Sink.class)绑定了 Sink接口, 该接口是 Spring Cloud Stream中默认实现的对输入消息通道绑定的定义
  • stream默认实现了绑定 output 通道的 Source 接口, 还有结合了Sink和Source的Processor接口,实际使用时我们也可以自已通过@Input和@Output注解来定义绑定消息通道的接口
    源码为
    在这里插入图片描述
    @Streamlistener:定义在方法上, 作用是将被修饰的方法注册为消息中间件上数据流的事件监听器, 注解中的属性值对应了监听的消息通道名

核心概念

在这里插入图片描述
Spring Cloud Stream构建的应用程序与消息中间件之间是通过绑定器 Binder相关联的,绑定器对于应用程序而言起到了隔离作用, 它使得不同消息中间件的实现细节对应用程序来说是透明的

binder

绑定器使得应用和消息中间件解耦,不会因为中间件的更新或者修改,而做大规模的修改.通过向应用暴露了channel通道,如果要修改binder可以直接在配置文件中配置
在这里插入图片描述

发布-订阅模式

  • 发布消息到中间件之后,然后会统一到一个topic的概念,就是每个中间件对应的概念(kafka的topic,rabbit的Exchange),然后会传播到有订阅的地方,Spring Cloud Stream采用的发布-订阅模式可以有效降低消息生产者与消费者之间的耦合。 当需要对同一类消息增加一种处理方式时,只需要增加一个应用程序并将输入通道绑定到既有的Topic中就可以实现功能的扩展,而不需要改变原来已经实现的任何内容

消费组

  • 当消息生产者产生消息的时候,对应的订阅者会消费消息,为了避免一条消息被消费多次的情况,springcloud stream有一个消费组的概念,通过spring.cloud.stream.binding.input.group为应用指定一个属性名,这样子就能保证多个实例(同个组)同时接受到消息,但是只有一个能消费,
    在这里插入图片描述
    默认情况,如果没有指定消费组,也会创建一个独立的匿名消费组,所以说如果消息推送之后,所有的独立的匿名的消费组都会消费这条消息,因为自身就是一组

消息分区

对于一些特征的消息(比如固有id),希望被同一个消费者消费的情况,就有了消息分区的概念,

使用详解

通过 @EnableBinding 注解来为应用启动消息驱动的功能,在value里面指定通道(输入还是输出,也可以多个),里面包含了

  • @Configuration:同时也是一个配置类
  • @ChannelBindingServiceConfiguration :加载消费通道绑定需要的一些实例, 比如用于处理消息通道绑定的 ChannelBindingService 实例.消息类型转换器 MessageConverterConfigurer.消息通道工厂 BindableChannel­Factory 等重要实例
  • binderFactoryConfiguration: Binder工厂的配置,主要用来加载与消息中间件相关的配置信息
  • SpelExpressionConverterConfiguration: SpEL表达式转换器配置

绑定消息通道

  • 在接口中通过Input和@Output注解来定义消息通道, 而用于定义绑定消息通道的接口则可以被@EnableBinding注解的value参数来指定,从而在应用启动的时候实现对定义消息通道的绑定
  • 如果使用input和output注解,不指定具体的value值,默认使用方法名作为消息通道的名称,
  • 定义输出通道的时候,需要返回MessageChannel接口对象, 该接口定义了向消息通道发送消息的方法;
  • 而定义输入通道时,需要返回SubscribableChannel接口对象,该接口 继承自MessageChannel接口,它定义了维护消息通道订阅者的方法。

注入绑定接口

  • 创建一个将Input消息通道作为输出通道的接口,具体如下
    在这里插入图片描述
  • 在@EnableBinding 注解中增加对SinkSender接口的指定,使Spring Cloud Stream能创建出对应的实例
    在这里插入图片描述
  • 创建一个单元测试类,通过 @Autowired注解注入SinkSender的实例,并在测试用例中调用它的发送消息方法
    在这里插入图片描述

注入消息通道

可以通过上面的接口直接注入一个
在这里插入图片描述

  • 但是要注意参数命名需要与通道同名才能被正确注入,或者也可以使用 @Qualifier 注解来特别指定具体实例的名称,该名称需要与定义 MessageChannel 的 @Output 中的value 参数 一 致, 这样才能被正确注入

消息生产和消费

@StrearnListener / @ServiceActivator

  • 通过该注解修饰的方法,Spring Cloud Steam会将其注册为输入消息通道的监听器 .当输入消息通道中有消息到达的时候,会立即触发该注解修饰方法的处理逻辑对消息进行消费, @StearnListener 相比 @ServiceActivator 更为强大,因为它还内置了 一 系列的消息转换功能
  • 由于 @ServiceActivator本身不具备对消息的转换能力,所以当代表 User 对象的JSON 字符串到达后, 它自身无法将其转换成 User 对象.所以,这里需要通过@Transformer 注解帮助将字符串类型的消息转换成 User 对象
    @SendTo可以把监听到的消息转发给另外一个通道(但是一定要有Streamlistenner或者ServiceActivator)

版权声明:本文为weixin_42672777原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。