- 首先在pom文件中引入依赖`
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
<version>5.12.3</version>
</dependency>
- 新建activemq.xml配置文件放在resources目录下
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mybroker" useJmx="true">
<persistenceAdapter>
<kahaDB directory="./kahadb"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
</beans>
- 在yml文件中配置activemq连接信息
server:
port: 81
spring:
activemq:
#连接地址
broker-url: tcp://127.0.0.1:61616
#用户名
user: admin
#密码
password: admin
pool:
#开启连接池
enabled: true
#最大连接数
max-connections: 5
#连接超时时间
idle-timeout: 30000
jms:
#开启发布、订阅模式
pub-sub-domain: true
- 新建配置文件ActiveMqConf.java
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
import javax.jms.Topic;
/**
* @author ssy
* @date 2021/8/9 11:55
*/
@Component
public class ActiveMqConf {
//队列
public static final String myQueue = "myQueue";
//主题
public static final String myTopic = "myTopic";
@Value("${spring.activemq.broker-url}")
private String mqUrl;
@Value("${spring.activemq.user}")
private String user;
@Value("${spring.activemq.password}")
private String password;
@Bean
public Queue queue(){
return new ActiveMQQueue(myQueue);
}
@Bean
public BrokerFactoryBean activemq() {
BrokerFactoryBean broker = new BrokerFactoryBean();
broker.setConfig(new ClassPathResource("activemq.xml"));
broker.setStart(true);
return broker;
}
@Bean
public ActiveMQConnectionFactory createFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(mqUrl);
factory.setUserName(user);
factory.setPassword(password);
RedeliveryPolicy policy = new RedeliveryPolicy();
//是否在每次尝试重新发送失败后,增长这个等待时间
policy.setUseExponentialBackOff(true);
//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
policy.setBackOffMultiplier(2);
// 每隔几秒重试一次
policy.setInitialRedeliveryDelay(30000);
//重发时间间隔
policy.setRedeliveryDelay(5000);
//最大重发次数
policy.setMaximumRedeliveries(3);
//setUseExponentialBackOff = true时有效,假设首次重连间隔为10ms,倍数为2,那么第 二次重连时间间隔为 20ms,第三次重连时间间隔为40ms
// 当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔
policy.setMaximumRedeliveryDelay(10000);
factory.setRedeliveryPolicy(policy);
//设置对象可作为消息传递的包
factory.setTrustAllPackages(true);
// factory.setBrokerURL(springBootMQConfig.getBrokerUrl());
return factory;
}
@Bean
public JmsTemplate createJsmTemplate(ActiveMQConnectionFactory factory) {
JmsTemplate template = new JmsTemplate(factory);
template.setSessionTransacted(false);
//设置ack为手动确认模式
template.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
template.setConnectionFactory(factory);
template.setDeliveryMode(2);
return template;
}
@Bean("jmsListenerContainerFactory")
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory factory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setSessionTransacted(false);
bean.setPubSubDomain(false);
bean.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
bean.setConnectionFactory(factory);
return bean;
}
@Bean
public Topic topic(){
return new ActiveMQTopic(myTopic);
}
@Bean("jmsListenerContainerTopic")
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory factory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(factory);
return bean;
}
}
新建ActiveMqProduceService.java 生产者接口文件
/**
* @author ssy
* @date 2021/8/9 16:16
*/
public interface ActiveMqProduceService {
//点对点通信
void sendMsg(String msg);
//发布订阅
void sendTopicMsg(String msg);
}
新建ActiveMqProduce.java 实现接口文件
import cn.stylefeng.guns.modular.config.ActiveMqConf;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Topic;
/**
* @author ssy
* @date 2021/8/9 14:39
*/
@Service
public class ActiveMqProduce implements ActiveMqProduceService {
@Resource
private JmsMessagingTemplate jmsTemplate;
@Resource
private Topic topic;
@Override
public void sendMsg(String msg){
jmsTemplate.convertAndSend(ActiveMqConf.myQueue, msg);
}
@Override
public void sendTopicMsg(String msg) {
jmsTemplate.convertAndSend(topic, msg);
}
}
新建ActiveMqConsumer.java 文件
package cn.stylefeng.guns.modular.activemq;
import cn.stylefeng.guns.modular.config.ActiveMqConf;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.Session;
/**
* @author ssy
* @date 2021/11/3 9:22
*/
@Component
@Slf4j
public class ActiveMqConsumer {
//myQueue监听队列信息
@JmsListener(destination = ActiveMqConf.myQueue,containerFactory = "jmsListenerContainerFactory")
@Transactional(rollbackFor = Exception.class,timeout = 15)
public void receive1(ActiveMQTextMessage mqMessage, Session session) throws Exception {
log.info("myQueue1收到消息:{}",mqMessage.getText());
mqMessage.acknowledge();
}
//myQueue监听队列信息
@JmsListener(destination = ActiveMqConf.myQueue,containerFactory = "jmsListenerContainerFactory")
@Transactional(rollbackFor = Exception.class,timeout = 15)
public void receive2(ActiveMQTextMessage mqMessage, Session session) throws Exception {
log.info("myQueue2收到消息:{}",mqMessage.getText());
mqMessage.acknowledge();
}
/**
*myTopic监听订阅信息
* @ JmsListener如果不指定独立的containerFactory的话是只能消费queue消息 是订阅不到消息的
* @param text
*/
@JmsListener(destination = ActiveMqConf.myTopic, containerFactory="jmsListenerContainerTopic")
public void receiveTopic1(ActiveMQTextMessage text) throws JMSException {
log.info("topic1收到消息:{}",text.getText());
}
//myTopic监听订阅信息
@JmsListener(destination = ActiveMqConf.myTopic, containerFactory="jmsListenerContainerTopic")
public void receiveTopic2(ActiveMQTextMessage text) throws JMSException {
log.info("topic2收到消息:{}",text.getText());
}
}
测试:新建MqConreoller.java文件
import cn.stylefeng.guns.modular.activemq.ActiveMqProduceService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author ssy
* @date 2022/6/29 15:38
*/
@RestController
public class MqConreoller {
@Resource
private ActiveMqProduceService mqProduceService;
//队列发送信息
@GetMapping("/mq/send")
public void send(String msg){
mqProduceService.sendMsg(msg);
}
//发布信息
@GetMapping("/mq/sendTopic")
public void sendTopic(String msg){
mqProduceService.sendTopicMsg(msg);
}
}
浏览器多次访问http://localhost:82/mq/send?msg=123
可以看到控制台轮询打印 :
myQueue1收到消息:123
myQueue2收到消息:123
浏览器多次访问http://localhost:82/mq/sendTopic?msg=123
可以看到控制台同时打印:
topic1收到消息:123
topic2收到消息:123
版权声明:本文为qq_43549747原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。