1 概述
Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。)通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费
2 核心概念
绑定器
Binder 绑定器是Spring Cloud Stream中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。
通过定义绑定器作为中间层,实现了应用程序与消息中间件(Middleware)细节之间的隔离。通过向应用程序暴露统一的Channel通过,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。甚至可以任意的改变中间件的类型而不需要修改一行代码。
通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binder 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。
发布/订阅模型
在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的 Topic 主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中, Topic 可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic。
3 入门案例
3.1 准备工作
案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream的案例。需要自行安装
3.2 消息生产者
(1)创建工程引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
(2)定义bingding
发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
这就接口声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生产者。
(3)配置application.yml.
spring:
application:
name: stream_producer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output:
destination: itcast-default #指定消息发送的目的地,在rabbitmq中,发送到一个itcast-default的exchange中
contentType: text/plain # 用于指定消息的类型
binders: #配置绑定器
defaultRabbit:
type: rabbit
(4)测试发送消息
/**
* 入门案例:
* 1.引入依赖
* 2.配置application.yml文件
* 3.发送消息的话,定义一个通道接口,通过接口中内置的messagechannel
* springcloudstream中内置接口 Source
* 4.@EnableBinding : 绑定对应通道
* 5.发送消息的话,通过MessageChannel发送消息
* * 如果需要MessageChannel --> 通过绑定的内置接口获取
*/
@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication implements CommandLineRunner {
@Autowired
@Qualifier("output")
private MessageChannel output;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class);
}
@Override
public void run(String... args) throws Exception {
//发送MQ消息
output.send(MessageBuilder.withPayload("hello world").build());
}
}
3.3 消息消费者
(1)创建工程引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
(2)定义bingding
同发送消息一致,在Spring Cloud Stream中接受消息,需要定义一个接口,如下是内置的一个接口。
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
注释 @Input 对应的方法,需要返回 SubscribableChannel ,并且参入一个参数值。
这就接口声明了一个 binding 命名为 “input” 。
(3)配置application.yml
spring:
cloud:
stream:
bindings:
input:
destination: itcast-default
destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 itcastdefault
(4) 测试
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
// 监听 binding 为 Sink.INPUT 的消息
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("监听收到:" + message.getPayload());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
- 定义一个 class (这里直接在启动类),并且添加注解 @EnableBinding(Sink.class) ,其中Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为 @StreamListener(Processor.INPUT),方法参数为 Message 。
- 启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “itcast-default ”,routing key为 “#”。
- 所有发送 exchange 为“itcast-default ” 的MQ消息都会被投递到这个临时队列,并且触发上述的方法。
4 自定义消息通道
Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。
/**
* 自定义的消息通道
*/
public interface MyProcessor {
/**
* 消息生产者的配置
*/
String MYOUTPUT = "myoutput";
@Output("myoutput")
MessageChannel myoutput();
/**
* 消息消费者的配置
*/
String MYINPUT = "myinput";
@Input("myinput")
SubscribableChannel myinput();
}
- 一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入,和订单输出两个 binding。
- 使用时,需要在 @EnableBinding 注解中,添加自定义的接口。
- 使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT_ORDER
spring:
application:
name: stream_producer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output:
destination: itcast-default #指定消息发送的目的地,在rabbitmq中,发送到一个itcast-default的exchange中
myoutput:
destination: itcast-custom-output
producer:
partition-key-expression: payload #分区关键字 对象中的id,对象
partition-count: 2 #分区大小
binders: #配置绑定器
defaultRabbit:
type: rabbit
5 消息分组
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
实现的方式非常简单,我们只需要在服务消费者端设置
spring.cloud.stream.bindings.input.group 属性即可,比如我们可以这样实现:
server:
port: 7003 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: itcast
password: itcast
virtual-host: myhost
cloud:
stream:
bindings:
input:
destination: itcast-default
inputOrder:
destination: testChannel
group: group-2
binders:
defaultRabbit:
type: rabbit
在同一个group中的多个消费者只有一个可以获取到消息并消费
6 消息分区
有一些场景需要满足, 同一个特征的数据被同一个实例消费, 比如同一个id的传感器监测数据必须被同一个实例统计计算分析, 否则可能无法获取全部的数据。又比如部分异步任务,首次请求启动task,二次请求取消task,此场景就必须保证两次请求至同一实例.
消息消费者配置
cloud:
stream:
instance-count: 2
instance-index: 0
bindings:
input:
destination: itcast-default
inputOrder:
destination: testChannel
group: group-2
consumer:
partitioned: true
binders:
defaultRabbit:
type: rabbit
从上面的配置中,我们可以看到增加了这三个参数:
- spring.cloud.stream.bindings.input.consumer.partitioned :通过该参数开启消费者分
区功能; - spring.cloud.stream.instanceCount :该参数指定了当前消费者的总实例数量;
- spring.cloud.stream.instanceIndex :该参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount 参数 - 1。我们试验的时候需要启动多个实例,可以通过运行参数来为不同实例设置不同的索引值。
消息生产者配置
spring:
application:
name: rabbitmq-producer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: itcast
password: itcast
virtual-host: myhost
cloud:
stream:
bindings:
input:
destination: itcast-default
outputOrder:
destination: testChannel
producer:
partition-key-expression: payload
partition-count: 2
binders:
defaultRabbit:
type: rabbit
从上面的配置中,我们可以看到增加了这两个参数:
- pring.cloud.stream.bindings.output.producer.partitionKeyExpression :通过该参数
指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键; - spring.cloud.stream.bindings.output.producer.partitionCount :该参数指定了消息分
区的数量。
到这里消息分区配置就完成了,我们可以再次启动这两个应用,同时消费者启动多个,但需要注意的是要为消费者指定不同的实例索引号,这样当同一个消息被发给消费组时,我们可以发现只有一个消费实例在接收和处理这些相同的消息。