Active-MQ整合Spring Boot实现简单生产者,消费者

 ActiveMQ介绍

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的、面向消息的、能够跨越多语言和多系统的应用集成消息通信中间件。

它为企业应用中消息传递提供高可用、出色性能、可扩展、稳定和安全保障。

ActiveMQ实现JMS规范并在此之上提供大量额外的特性。ActiveMQ支持队列和订阅两种模式的消息发送。

AcitveMQ的数据传送流程如下图:

ActiveMQ的两种消息传递类型:

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。

(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的。

两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。

 Spring Boot集成ActiveMQ

pring Boot针对ActiveMQ专门提供了spring-boot-starter-activemq,用来支持ActiveMQ在Spring Boot的自动集成配置。在此基础上我们可以很轻易的进行集成和使用。

创建项目并引入依赖

创建标准的Spring Boot项目,并在项目中引入以下依赖:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
</dependency>

配置文件

在application.yml中添加如下配置:

spring:
  activemq:
    #基于内存的ActiveMQ
    #in-memory: true
    #不使用连接池,如果使用连接池还需在pom中添加activemq-pool的依赖
    #pool.enabled: false
    #独立安装的ActiveMQ#
    broker-url: tcp://192.168.2.113:61616
    user: admin
    password: 654321

上述配置中有两套配置,Spring Boot支持基于内存ActiveMQ和基于独立安装的ActiveMQ。正常请求基于内存的形式是为了方便测试而使用,基于独立安装的形式才是真正用于生产环境。此处为了讲解功能,方便测试,采用基于内存的形式。

队列模式实例

首先,我们来实现基于队列(Queue)形式的实现。这里需要用到两个类ActiveMQQueue和JmsMessagingTemplate。前者是由ActiveMQ对javax.jms.Queue的接口实现。后者为Spring提供发送消息的工具类,结合Queue对消息进行发送。

JmsMessagingTemplate默认已经被实例化,直接拿来使用即可。而ActiveMQQueue则需要我们进行实例化,并传入消息队列的名称。

@Configuration
public class MyMqConfig {

	@Bean
	public Queue queue() {
		return new ActiveMQQueue("sms.queue");
	}

}

Spring Boot中很常规的实例化操作,不再赘述。当实例化完ActiveMQQueue之后,我们的队列便创建完成,下面创建对应的生产者和消费者。

生产者对应代码如下:

@Component
public class Producer {

	@Resource
	private JmsMessagingTemplate jmsMessagingTemplate;

	@Resource
	private Queue queue;

	public void sendMsg(String msg) {
		System.out.println("发送消息内容 :" + msg);
		this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
	}

}

此处用到JmsMessagingTemplate和Queue,上面已经提到,这两个类都已经完成了初始化。消费者对应的配置如下:

@Component
public class Consumer {

	@JmsListener(destination = "sms.queue")
	public void receiveMsg(String text) {
		System.out.println("接收到消息 : "+text);
	}
}

Spring提供了注解式监听器端点:使用@JmsListener。使用@JmsListener托管bean的带注释方法对其进行订阅。在Java8中,@JmsListener是一个可重复的注解,可以关联多个JMS destinations到同一个方法中。而在Java 6和7中,可以使用@JmsListeners注解。

其中destination指定监控的消息队列名称为“sms.queue”。当队列sms.queue中有消息发送时会触发此方法的执行,text为消息内容。

上面完成了队列初始化、生产者和消费者代码的编写,下面通过单元测试来验证是否能够正确发送和处理消息。

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActiveMqTests {

	@Autowired
	private Producer producer;

	@Test
	public void sendSimpleQueueMessage() {
		this.producer.sendMsg("发送消息.....");
	}

}

执行单元测试,会发现在日志中打印如下信息:

发送消息内容 :发送消息.....

接收到消息 : 发送消息.....

 说明消息可以正常发送和接收。如果是基于内存模式,在执行单元测试时会打印出“javax.jms.JMSException: peer (vm://localhost#1) stopped.”异常日志,这是Info级别的错误,是ActiveMQ的一个bug。


版权声明:本文为YuChen954原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。