ActiveMQ介绍
ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的、面向消息的、能够跨越多语言和多系统的应用集成消息通信中间件。
它为企业应用中消息传递提供高可用、出色性能、可扩展、稳定和安全保障。
ActiveMQ实现JMS规范并在此之上提供大量额外的特性。ActiveMQ支持队列和订阅两种模式的消息发送。
AcitveMQ的数据传送流程如下图:

ActiveMQ的两种消息传递类型:
(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。
(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的。
两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。
Spring Boot集成ActiveMQ
pring Boot针对ActiveMQ专门提供了spring-boot-starter-activemq,用来支持ActiveMQ在Spring Boot的自动集成配置。在此基础上我们可以很轻易的进行集成和使用。
创建项目并引入依赖
创建标准的Spring Boot项目,并在项目中引入以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>配置文件
在application.yml中添加如下配置:
spring:
activemq:
#基于内存的ActiveMQ
#in-memory: true
#不使用连接池,如果使用连接池还需在pom中添加activemq-pool的依赖
#pool.enabled: false
#独立安装的ActiveMQ#
broker-url: tcp://192.168.2.113:61616
user: admin
password: 654321上述配置中有两套配置,Spring Boot支持基于内存ActiveMQ和基于独立安装的ActiveMQ。正常请求基于内存的形式是为了方便测试而使用,基于独立安装的形式才是真正用于生产环境。此处为了讲解功能,方便测试,采用基于内存的形式。
队列模式实例
首先,我们来实现基于队列(Queue)形式的实现。这里需要用到两个类ActiveMQQueue和JmsMessagingTemplate。前者是由ActiveMQ对javax.jms.Queue的接口实现。后者为Spring提供发送消息的工具类,结合Queue对消息进行发送。
JmsMessagingTemplate默认已经被实例化,直接拿来使用即可。而ActiveMQQueue则需要我们进行实例化,并传入消息队列的名称。
@Configuration
public class MyMqConfig {
@Bean
public Queue queue() {
return new ActiveMQQueue("sms.queue");
}
}Spring Boot中很常规的实例化操作,不再赘述。当实例化完ActiveMQQueue之后,我们的队列便创建完成,下面创建对应的生产者和消费者。
生产者对应代码如下:
@Component
public class Producer {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
@Resource
private Queue queue;
public void sendMsg(String msg) {
System.out.println("发送消息内容 :" + msg);
this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
}
}此处用到JmsMessagingTemplate和Queue,上面已经提到,这两个类都已经完成了初始化。消费者对应的配置如下:
@Component
public class Consumer {
@JmsListener(destination = "sms.queue")
public void receiveMsg(String text) {
System.out.println("接收到消息 : "+text);
}
}Spring提供了注解式监听器端点:使用@JmsListener。使用@JmsListener托管bean的带注释方法对其进行订阅。在Java8中,@JmsListener是一个可重复的注解,可以关联多个JMS destinations到同一个方法中。而在Java 6和7中,可以使用@JmsListeners注解。
其中destination指定监控的消息队列名称为“sms.queue”。当队列sms.queue中有消息发送时会触发此方法的执行,text为消息内容。
上面完成了队列初始化、生产者和消费者代码的编写,下面通过单元测试来验证是否能够正确发送和处理消息。
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActiveMqTests {
@Autowired
private Producer producer;
@Test
public void sendSimpleQueueMessage() {
this.producer.sendMsg("发送消息.....");
}
}执行单元测试,会发现在日志中打印如下信息:
发送消息内容 :发送消息.....
接收到消息 : 发送消息.....说明消息可以正常发送和接收。如果是基于内存模式,在执行单元测试时会打印出“javax.jms.JMSException: peer (vm://localhost#1) stopped.”异常日志,这是Info级别的错误,是ActiveMQ的一个bug。