(十八)Spring Cloud 学习笔记之Spring Cloud Stream

1 概述

Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。)通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
在这里插入图片描述
说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费

2 核心概念

绑定器
Binder 绑定器是Spring Cloud Stream中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。

通过定义绑定器作为中间层,实现了应用程序与消息中间件(Middleware)细节之间的隔离。通过向应用程序暴露统一的Channel通过,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。甚至可以任意的改变中间件的类型而不需要修改一行代码。

通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binder 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。

发布/订阅模型
在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的 Topic 主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中, Topic 可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic。
在这里插入图片描述

3 入门案例

3.1 准备工作

案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream的案例。需要自行安装

3.2 消息生产者

(1)创建工程引入依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

(2)定义bingding
发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

这就接口声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生产者。
(3)配置application.yml.

spring:
  application:
    name: stream_producer #指定服务名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output:
          destination: itcast-default  #指定消息发送的目的地,在rabbitmq中,发送到一个itcast-default的exchange中
          contentType: text/plain # 用于指定消息的类型
      binders:  #配置绑定器
        defaultRabbit:
          type: rabbit

(4)测试发送消息

/**
 * 入门案例:
 *      1.引入依赖
 *      2.配置application.yml文件
 *      3.发送消息的话,定义一个通道接口,通过接口中内置的messagechannel
 *              springcloudstream中内置接口  Source
 *      4.@EnableBinding : 绑定对应通道
 *      5.发送消息的话,通过MessageChannel发送消息
 *          * 如果需要MessageChannel --> 通过绑定的内置接口获取
 */
@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication implements CommandLineRunner {
    @Autowired
    @Qualifier("output")
    private MessageChannel output;
    
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }

    @Override
    public void run(String... args) throws Exception {
        //发送MQ消息
        output.send(MessageBuilder.withPayload("hello world").build());
    }
}

3.3 消息消费者

(1)创建工程引入依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

(2)定义bingding
同发送消息一致,在Spring Cloud Stream中接受消息,需要定义一个接口,如下是内置的一个接口。

public interface Sink {
    String INPUT = "input";
    @Input("input")
    SubscribableChannel input();
}

注释 @Input 对应的方法,需要返回 SubscribableChannel ,并且参入一个参数值。
这就接口声明了一个 binding 命名为 “input” 。

(3)配置application.yml

spring:
 cloud:
   stream:
     bindings:
       input:
         destination: itcast-default

destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 itcastdefault
(4) 测试

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("监听收到:" + message.getPayload());
   }
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
   }
}
  • 定义一个 class (这里直接在启动类),并且添加注解 @EnableBinding(Sink.class) ,其中Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为 @StreamListener(Processor.INPUT),方法参数为 Message 。
  • 启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “itcast-default ”,routing key为 “#”。
  • 所有发送 exchange 为“itcast-default ” 的MQ消息都会被投递到这个临时队列,并且触发上述的方法。

4 自定义消息通道

Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。

/**
 * 自定义的消息通道
 */
public interface MyProcessor {

	/**
	 * 消息生产者的配置
	 */
	String MYOUTPUT = "myoutput";

	@Output("myoutput")
	MessageChannel myoutput();

	/**
	 * 消息消费者的配置
	 */
	String MYINPUT = "myinput";

	@Input("myinput")
	SubscribableChannel myinput();
}

  • 一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入,和订单输出两个 binding。
  • 使用时,需要在 @EnableBinding 注解中,添加自定义的接口。
  • 使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT_ORDER
spring:
 application:
   name: stream_producer #指定服务名
 rabbitmq:
   addresses: 127.0.0.1
   username: guest
   password: guest
 cloud:
   stream:
     bindings:
       output:
         destination: itcast-default  #指定消息发送的目的地,在rabbitmq中,发送到一个itcast-default的exchange中
       myoutput:
         destination: itcast-custom-output
         producer:
           partition-key-expression: payload  #分区关键字   对象中的id,对象
           partition-count: 2  #分区大小
     binders:  #配置绑定器
       defaultRabbit:
         type: rabbit

5 消息分组

通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
在这里插入图片描述
实现的方式非常简单,我们只需要在服务消费者端设置
spring.cloud.stream.bindings.input.group 属性即可,比如我们可以这样实现:

server:
 port: 7003 #服务端口
spring:
 application:
   name: rabbitmq-consumer #指定服务名
 rabbitmq:
   addresses: 127.0.0.1
   username: itcast
   password: itcast
   virtual-host: myhost
 cloud:
   stream:
     bindings:
       input:
         destination: itcast-default
       inputOrder:
         destination: testChannel
         group: group-2
     binders:
       defaultRabbit:
         type: rabbit

在同一个group中的多个消费者只有一个可以获取到消息并消费

6 消息分区

有一些场景需要满足, 同一个特征的数据被同一个实例消费, 比如同一个id的传感器监测数据必须被同一个实例统计计算分析, 否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例.
在这里插入图片描述
消息消费者配置

 cloud:
   stream:
     instance-count: 2
     instance-index: 0
     bindings:
       input:
         destination: itcast-default
       inputOrder:
         destination: testChannel
         group: group-2
         consumer:
           partitioned: true
     binders:
       defaultRabbit:
         type: rabbit

从上面的配置中,我们可以看到增加了这三个参数:

  1. spring.cloud.stream.bindings.input.consumer.partitioned :通过该参数开启消费者分
    区功能;
  2. spring.cloud.stream.instanceCount :该参数指定了当前消费者的总实例数量;
  3. spring.cloud.stream.instanceIndex :该参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount 参数 - 1。我们试验的时候需要启动多个实例,可以通过运行参数来为不同实例设置不同的索引值。

消息生产者配置

spring:
application:
  name: rabbitmq-producer #指定服务名
rabbitmq:
  addresses: 127.0.0.1
  username: itcast
  password: itcast
  virtual-host: myhost
cloud:
  stream:
    bindings:
      input:
        destination: itcast-default
      outputOrder:
        destination: testChannel
        producer:
          partition-key-expression: payload
          partition-count: 2
    binders:
      defaultRabbit:
        type: rabbit

从上面的配置中,我们可以看到增加了这两个参数:

  1. pring.cloud.stream.bindings.output.producer.partitionKeyExpression :通过该参数
    指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
  2. spring.cloud.stream.bindings.output.producer.partitionCount :该参数指定了消息分
    区的数量。

到这里消息分区配置就完成了,我们可以再次启动这两个应用,同时消费者启动多个,但需要注意的是要为消费者指定不同的实例索引号,这样当同一个消息被发给消费组时,我们可以发现只有一个消费实例在接收和处理这些相同的消息。


版权声明:本文为weixin_43596905原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。