消息中间件:消息的传递者,在开发软件中原本的模式类似于A服务直接调用B服务内容,然后可能还要调用C服务,类似于我有个东西需要发送给朋友B,朋友C,在这个发送过程中会经过山河大海,也有可能在朋友B这里被卡住,为了方便,后面就出现了快递公司这样一个机构,我们把东西交给快递公司,快递公司会自己进行邮寄工作,而我本身就可以自己去自己的事了,那么在软件开发中,消息中间件类似于在A服务与B服务中间增加了一个软件项目,A服务发送消息后就可以做自己的事了,接下来的处理就是B服务的事情,以此来减少A服务的相应时间,提高速度
作用:
异步处理 :在执行的时候B服务并没有在A服务执行的时候进行操作,这称为异步操作
应用解耦 :A服务原本依赖于B服务操作,当使用中间件的时候A服务不再依赖于B服务,依赖降低了
流量削峰 :在B服务消费的时候可以根据操作进行额外处理
常用消息队列
kafka 吞吐量庞大,用于日志
activeMQ 速度快,功能强大
rabbitMQ 事务性高,应用于金融
模式:
队列模式P2P:点对点的方式,生产者生产的消息,消费者只能有一个消费者处理
发布订阅:publisher生产端,subscriber订阅者订阅,一个消息发送完成后可以被多个消费者消费,这里的消费者也称为订阅者
涉及专业名词:
生产者:生产消息一方,调用方法生成消息存入activeMQ,供消费方处理
消费者:开启监听器,接受到消息后开始进行处理消息操作
一、windows下active的安装和使用
直接去官网下载即可,加载完成后解压
进入
bin\win64
目录下,双击activemq.bat命令开启activemq
启动完成后就可以直接使用,另外也可以通过activemq提供的后台可以直接管理队列,
启动完成后打开浏览器输入http://127.0.0.1:8161,用户名密码是都是admin
二、简单消息队列测试
1)生产者
2)消费者
三、SSM下基于xml配置使用消息队列
注意本例子是在SSM环境下安装的,所以这里需要提前搭建SSM环境
1)添加jar包
<!--消息队列相关jar-->
<!--jms:Java消息服务(Java Message Service)应用程序接口-->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<!--activemq jar-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.0</version>
</dependency>
<!--slf4j日志处理-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
2)生产者配置
配置spring.xml
添加约束
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
添加配置
<!--消息队列-->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!--activeMQ配置-->
<property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>
<!--缓存消息队列,增加性能-->
<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
>
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!--缓存数量-->
<property name="sessionCacheSize" value="10"></property>
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!--消息队列名称-->
<constructor-arg value="SSM.MSG"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="amqConnectionFactory" />
<property name="defaultDestination" ref="destination"/>
</bean>
controller发送消息
@Autowired
private JmsTemplate jmsTemplate;
@RequestMapping("sendMsg")
public String sendMsg(){
jmsTemplate.send(new Msg("抢购","12"));
return "list";
}
3)消费者配置
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>
<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
>
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="10"></property>
</bean>
<jms:listener-container
container-type="default"
connection-factory="amqConnectionFactory"
acknowledge="auto">
<!--对应处理的类-->
<jms:listener destination="SSM.MSG" ref="msgListener" method="onMessage"></jms:listener>
</jms:listener-container>
监听的类
@Component
public class MsgListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的信息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
4)测试
浏览器地址栏直接访问sendMsg,就会想activeMQ发送数据
四、springBoot使用消息队列
1)添加jar包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2)生产者配置
properties文件中添加配置
spring.activemq.broker-url=tcp://localhost:61616
#true 表示使用内置的MQ,false则连接服务器
spring.activemq.in-memory=false
spring.activemq.packages.trust-all=true
启动类上加上@EnableJms注解
在conroller中添加发送消息的方法
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 测试activeMQ订阅功能-接收方
*/
@GetMapping("subscribe")
public void subscribec(){
// 创建ActiveMQTopic就是发送订阅
jmsMessagingTemplate.convertAndSend(new ActiveMQTopic("subscribe")
,"订阅");
}
/**
* 测试activeMQ一对一-接收方
*/
@GetMapping("one")
public void onec(){
// 创建ActiveMQQueue就是消息
jmsMessagingTemplate.convertAndSend(new ActiveMQQueue("one")
,"one to one");
}
3)消费者配置
配置文件,jar包和启动类的处理都和生产者一样,唯一需要注意的在控制类中使用
/**
* 测试activeMQ订阅功能-接收方
*/
@JmsListener(destination = "subscribe")
public void subscribe(String msg){
System.out.println("订阅功能:"+msg);
}
/**
* 测试activeMQ一对一-接收方
*/
@JmsListener(destination = "one")
public void one(String msg){
System.out.println("点对点发送功能:"+msg);
}
注意:如果是发送的是订阅模式,必须在生产者和消费者的配置文件中添加
spring.jms.pub-sub-domain=true