感谢尚硅谷
视频地址:尚硅谷ActiveMQ教程(MQ消息中间件快速入门)_哔哩哔哩_bilibili
主要内容
一、消息中间件介绍
二、Java编码实现ActiveMQ通讯
三、JMS规范
四、SpringBoot整合ActiveMQ
五、消息存储和持久化
六、高级特性
一、消息中间件介绍
1、为什么要在系统里引入消息中间件
微服务架构后,链式调用是我们在写程序时候的一般流程,为了完成一个整体功能会将其拆分成多个函数(或子模块),比如模块A调用模块B,模块B调用模块C,模块C调用模块D。但在大型分布式应用中,系统间的RPC交互繁杂,一个功能背后要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构的通例,这些架构存在以下问题。
1)系统之间接口耦合比较严重
每新增一个下游功能,都要对上游的相关接口进行改造;
举个例子:如果系统A要发送数据给系统B和系统C,发送给每个系统的数据可能有差异,因此系统A对要发送给每个系统的数据进行了组装,然后逐一发送;
当代码上线后又新增了一个需求:
把数据也发送给D,新上了一个D系统也要接受A系统的数据,此时就需要修改A系统,让他感知到D系统的存在,同时把数据处理好再给D。在这个过程你会看到,每接入一个下游系统,都要对系统A进行代码改造,开发联调的效率很低。其整体架构如下图
2)面对大流量并发时,容易被冲垮
每个接口模块的吞吐能力是有限的,这个上限能力如果是堤坝,当大流量(洪水)来临时,容易被冲垮。
举个例子秒杀业务:
上游系统发起下单购买操作,我就是下单一个操作
下游系统完成秒杀业务逻辑
(读取订单,库存检查,库存冻结,余额检查,余额冻结,订单生产,余额扣减,库存减少,生成流水,余额解冻,库存解冻)
3)等待同步存在性能问题
RPC接口上基本都是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于链路中最慢的那个接口。
比如A调用B/C/D都是50ms,但此时B又调用了B1,花费2000ms,那么直接就拖累了整个服务性能。
根据上述的几个问题,在设计系统时可以明确要达到的目标:
- 要做到系统解耦,当新的模块接进来时,可以做到代码改动最小;能够解耦
- 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮;能削峰
- 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步
2、消息中间件是什么
面向消息的中间件(message-oriented middleware)MOM能够很好的解决以上问题,是指利用高效可靠的消息传递机制与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储、流量削峰,异步通信,数据同步等功能。
大致的过程是这样的:
发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题topic中,在合适的时候,消息服务器回将消息转发给接受者。在这个过程中,发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有关系;
尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。
特点
消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或者队列)上;
消息接收者则订阅或者监听该爱通道。一条消息可能最终转发给一个或者多个消息接收者,这些消息接收者都无需对消息发送者做出同步回应。整个过程都是异步的。
1)发送者和接受者不必了解对方,只需要确认消息
2)发送者和接受者不必同时在线
能干嘛
解耦,削峰,异步
二、Java编码实现ActiveMQ通讯
1、ActiveMQ端口
ActiveMQ采用61616端口提供JMS服务
ActiveMQ采用8161端口提供管理控制台服务
2、JMS总体架构
Java Message Service
粗说目的地Destination队列(Queue)和主题(Topic)
3、JMS开发基本步骤
- 创建一个connection factory
- 通过connection factory来创建JMS connection
- 启动JMS connection
- 通过JMS connection创建JMS session
- 创建JMS destination(目的地 队列/主题)
- 创建JMS producer或者创建JMS consume并设置destination
- 创建JMS consumer或者注册一个JMS message listener
- 发送(send)或者接收(receive)JMS message
- 关闭所有JMS资源
4、Queue队列模式
消息生产者代码
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6.通过使用messageProducer生产三个消息发送到MQ的队列
for(int i=0;i<3;i++){
//7.创建消息
//text类型
TextMessage textMessage = session.createTextMessage("mag---" + i);
//8.通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("---消息发布到mq---");
}
消费者代码一(阻塞式消费者)
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
/*
同步阻塞方式((receive())
订阅者或接收者调用MessageConsumer的receive()方法接收信息,receive方法在能够接收到消息之前(或
超时之前)将一直阻塞
*/
while (true){
TextMessage testMessage =(TextMessage) messageConsumer.receive();
if(testMessage!=null){
System.out.println("**消费者接受到信息:"+testMessage.getText());
}else {
break;
}
}
}
messageConsumer.close();
session.close();
connection.close();
}
消费者代码二(异步监听式消费者)
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
/*
异步非阻塞式方式监听器(onMessage)
订阅者或消费者通过创建的消费者对象,给消费者注册消息监听器setMessageListener,
当消息有消息的时候,系统会自动调用MessageListener类的onMessage方法
我们只需要在onMessage方法内判断消息类型即可获取消息
*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到text消息:"+(textMessage.getText()));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//保证控制台不灭 不加的话消息没来得及处理程序就结束了
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
控制台说明
Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers=消费者数量,消费者端的消费者数量。
Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
总结:
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。
两种消费方式比较
同步阻塞方式
订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
异步非阻塞方式(监听器onMessage())
订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
5、Topic队列模式
发布/订阅消息传递域的特点如下:
- 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
- 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
- 生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅
生产者代码
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String TOPIC_NAME = "topic-atguigu";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
//6.通过使用messageProducer生产三个消息发送到MQ的队列
for(int i=0;i<3;i++){
//7.创建消息
TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
//8.通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("---TOPIC_NAME消息发布到mq---");
}
}
消费者代码
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String TOPIC_NAME = "topic-atguigu";
public static void main(String[] args) throws JMSException, IOException {
/*
edit configurations --> allow parallel run 一个程序同时启动多个实例
*/
System.out.println("----------消费者1---------------");
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.创建消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
/*
异步非阻塞式方式监听器(onMessage)
订阅者或消费者通过创建的消费者对象,给消费者注册消息监听器setMessageListener,
当消息有消息的时候,系统会自动调用MessageListener类的onMessage方法
我们只需要在onMessage方法内判断消息类型即可获取消息
*/
messageConsumer.setMessageListener(message -> {
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到消息:"+((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();//保证控制台不灭 不加的话消息没来得及处理程序就结束了
messageConsumer.close();
session.close();
connection.close();
}
}
6、两种模式比较
比较项目 |
工作模式 |
有无状态 |
传递完整性 |
处理效率 |
Queue队列 |
“负载均衡”模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息 |
Queue数据默认会在mq服务器上已文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB存储 |
消息不会被丢弃 |
由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的 |
模式Topic队列模式 |
“订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息 |
无状态 |
如果没有订阅者,消息会被丢弃 |
由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 |
三、JMS规范
1、什么是Java消息服务
Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
2、JMS的组成结构和特点
JMS Provider
实现JMS接口和规范的消息中间件,也就是我们说的MQ服务器
JMS Producer
消息生产者,创建和发送JMS消息的客户端应用
JMS Consumer
消息消费者,接收和处理JMS消息的客户端应用
JSM Message
1. 消息头
1)JMSDestination:消息发送的目的地,主要是指Queue和Topic
2)JMSDeliveryMode:持久模式和非持久模式。
一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。
3)JMSExpiration:可以设置消息在一定时间后过期,默认是永不过期
消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。
如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。
4)JMSPriority:消息优先级
从0-9十个级别,0-4是普通消息5-9是加急消息。
JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。
5)JMSMessageID:唯一标识每个消息的标识由MQ产生。
2. 消息体:封装具体的消息数据
5种消息格式:
1)TxtMessage:普通字符串消息,包含一个String
2)MapMessage:一个Map类型的消息,key为Strng类型,而值为Java基本类型
3)BytesMessage:二进制数组消息,包含一个byte[]
4)StreamMessage:Java数据流消息,用标准流操作来顺序填充和读取
5)ObjectMessage:对象消息,包含一个可序列化的Java对象
发送和接收的消息体类型必须一致对应
3. 消息属性
如果需要除消息字段以外的值,那么可以使用消息属性。
识别/去重/重点标注等操作非常有用的方法
3、代码操作
生产者代码
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6.通过使用messageProducer生产三天消息发送到MQ的队列
for(int i=0;i<3;i++){
//7.创建消息
//text类型
TextMessage textMessage = session.createTextMessage("mag---" + i);
//设置消息属性
textMessage.setStringProperty("z3","vip");
//map类型
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("k1","mapMessage_v1");
//object类型
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(new Student(1,"z3","java"));
//byte类型
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("bytesMessage消息".getBytes());
//stream类型
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("stream类型的消息");
streamMessage.writeString("你好");
streamMessage.writeDouble(3);
//8.通过messageProducer发送给mq
messageProducer.send(textMessage);
messageProducer.send(mapMessage);
messageProducer.send(objectMessage);
messageProducer.send(bytesMessage);
messageProducer.send(streamMessage);
}
//9.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("---消息发布到mq---");
}
}
@AllArgsConstructor
@Data
@NoArgsConstructor
//对象要可序列化
public class Student implements Serializable{
private int id;
private String name;
private String hobby;
}
消费者代码
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//从ActiveMQ5.12.2 开始,为了增强这个框架的安全性,ActiveMQ将强制用户配置可序列化的包名
//Student在序列化后,消费者在读取ObjectMessage会报错
factory.setTrustAllPackages(true);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
/*
异步非阻塞式方式监听器(onMessage)
订阅者或消费者通过创建的消费者对象,给消费者注册消息监听器setMessageListener,
当消息有消息的时候,系统会自动调用MessageListener类的onMessage方法
我们只需要在onMessage方法内判断消息类型即可获取消息
*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到text消息:"+(textMessage.getText()));
System.out.println(" text消息的属性:"+textMessage.getStringProperty("z3"));
} catch (JMSException e) {
e.printStackTrace();
}
}
if(null != message && message instanceof MapMessage){
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("接收到map消息:"+mapMessage.getString("k1"));
} catch (JMSException e) {
e.printStackTrace();
}
}
if(null != message && message instanceof ObjectMessage){
ObjectMessage objectMessage = (ObjectMessage) message;
try {
Student student =(Student) objectMessage.getObject();
System.out.println("接收到object消息:"+student.toString());
} catch (JMSException e) {
e.printStackTrace();
}
}
if(null != message && message instanceof BytesMessage){
BytesMessage bytesMessage = (BytesMessage) message;
try {
byte[] bytes = new byte[1024];
int len = -1;
System.out.print("接收到byte消息: ");
while ((len = bytesMessage.readBytes(bytes))!=-1){
System.out.print(new String(bytes,0,len));
}
System.out.println();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(null != message && message instanceof StreamMessage){
StreamMessage streamMessage = (StreamMessage) message;
try {
System.out.println("接收到stream消息-string:"+streamMessage.readString());
System.out.println("接收到stream消息-string:"+streamMessage.readString());
System.out.println("接收到stream消息-int:"+streamMessage.readDouble());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//保证控制台不灭 不加的话消息没来得及处理程序就结束了
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
4、JMS的可靠性
1)PERSISTENT:持久性
参数设置说明
1)非持久化:当服务器宕机,消息不存在。
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
2)持久化:当服务器宕机,消息依然存在。
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
持久的Queue
这是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
MessageProducer messageProducer = session.createProducer(queue);
//设置通过session创建出来的生产者生产的Queue消息为持久性
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
持久的Topic
持久的发布主题生产者代码
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String TOPIC_NAME = "topic-atguigu";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
//持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for(int i=0;i<3;i++){
TextMessage textMessage = session.createTextMessage("TOPIC_NAME_PERSIST---" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("---TOPIC_NAME消息发布到mq---");
}
}
持久的订阅主题消费者代码
1)一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题
2)然后运行生产者发送消息,此时无论消费者是否在线,都会接收到,下次连接的时候,会把没有收过的消息都接收下来
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String TOPIC_NAME = "topic-atguigu";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.setClientID("z3");
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.通过session创建持久化订阅
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark...");
//6.启动连接
connection.start();
//7.接受消息
Message message = topicSubscriber.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的持久化的topic: " + textMessage.getText());
message = topicSubscriber.receive(5000L);
}
session.close();
connection.close();
}
}
2)Transaction:事务
producer是否开启事务的代码规范
1)false
只要执行send,就进入到队列中
关闭事务,那第2个签收参数的设置需要有效
2)true
先执行send再执行commit,消息才被真正提交到队列中
消息需要需要批量提交,需要缓冲处理
事务偏生产者/签收偏消费者
生产者代码
public class JmsProduce {
//写法参照源码
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6.通过使用messageProducer生产三天消息发送到MQ的队列
for(int i=0;i<3;i++){
//7.创建消息
//text类型
TextMessage textMessage = session.createTextMessage("mag---" + i);
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
//不加commit消息发送不出去
session.commit();
session.close();
connection.close();
System.out.println("---消息发布到mq---");
}
}
消费者代码
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
TextMessage testMessage =(TextMessage) messageConsumer.receive(4000L);
if(testMessage!=null){
System.out.println("**消费者接受到信息:"+testMessage.getText());
}else {
break;
}
}
messageConsumer.close();
//不加commit消息会被重复消费
session.commit();
session.close();
connection.close();
}
}
3)Acknowledge:签收
非事务(允许重复消息)
1)自动签收(默认)
Session.AUTO_ACKNOWLEDGE
2)手动签收
Session.CLIENT_ACKNOWLEDGE
客户端调用acknowledge方法手动签收
生产者代码不变,消费者手动签收代码
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//手动
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
TextMessage testMessage =(TextMessage) messageConsumer.receive(3000L);
//签收
testMessage.acknowledge();
if(testMessage!=null){
System.out.println("**消费者接受到信息:"+testMessage.getText());
}else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
}
事务
生产事务开启,只有commit后才能将全部消息变为已消费
签收和事务的关系(事务>签收)
- 在事务性会话中,当一个事务被成功提交则消息被自动签收,如果事务回滚,则消息会被再次传送。
- 非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
5、JMS点对点总结
点对点模型是基于队列的,生产者发送消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。
- 如果在Session关闭时有部分消息被收到但还没有被签收(acknowledge),那当消费者下次连接到相同的队列时,这些消息还会被再次接收
- 队列可以长久的保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的链接状态,充分体现了异步传输模式的优势
6、JMS发布订阅总结
非持久订阅
非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。
如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。
持久订阅
客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息
非持久订阅状态下,不能恢复或重新派送一个未签收的消息。
持久订阅才能恢复或重新派送一个未签收的消息。
使用
当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅
三、Broker
1、介绍
相当于一个ActiveMQ服务器实例
说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。
2、代码操作
用ActiveMQ Broker作为独立的消息服务器来构建Java应用。
ActiveMQ也支持在vm中通信基于嵌入的broker,能够无缝的集成其他java应用。
//迷你版的ActiveMQ
public class EmbedBroker {
public static void main(String[] args) throws Exception {
//ActiveMQ也支持在vm中通信基于嵌入式的broker
BrokerService brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.addConnector("tcp://localhost:61616");
brokerService.start();
}
}
生产者代码
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6.通过使用messageProducer生产消息发送到MQ的队列
for(int i=0;i<3;i++){
//7.创建消息
//text类型
TextMessage textMessage = session.createTextMessage("mag---" + i);
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("---消息发布到mq---");
}
}
消费者代码
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.获得连接connection并启动
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数:①事务 ②签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
TextMessage testMessage =(TextMessage) messageConsumer.receive(3000L);
if(testMessage!=null){
System.out.println("**消费者接受到信息:"+testMessage.getText());
}else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
}
四、SpringBoot整合ActiveMQ
1、准备
pom文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
2、Queue队列模式
生产者
yml文件
# web占用的端口
server:
port: 7777
spring:
activemq:
# activemq的broker的url
broker-url: tcp://192.168.16.106:61616
# 连接activemq的broker所需的账号和密码
user: admin
password: admin
jms:
# 目的地是queue还是topic, false(默认) = queue true = topic
pub-sub-domain: false
# 自定义队列名称。这只是个常量
myqueue: boot-activemq-queue
configBean类,返回ActiveMQueue
@Component
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
@Bean
public Queue queue(){
return new ActiveMQQueue(myQueue);
}
}
生产者代码
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg(){
jmsMessagingTemplate.convertAndSend(queue,"******: "+ UUID.randomUUID().toString().substring(0,6));
}
}
主启动类
@SpringBootApplication
@EnableJms
public class Main_Application {
public static void main(String[] args) {
SpringApplication.run(Main_Application.class,args);
}
}
测试代码
@SpringBootTest(classes = MainApp.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {
@Autowired
private Queue_Producer queue_producer;
@Test
public void testSend() {
queue_producer.producerMsg();
}
}
消费者
yml文件
server:
port: 8888
spring:
activemq:
# activemq的broker的url
broker-url: tcp://192.168.16.106:61616
# 连接activemq的broker所需的账号和密码
user: admin
password: admin
jms:
# 目的地是queue还是topic, false(默认) = queue true = topic
pub-sub-domain: false
# 自定义队列名称。这只是个常量
myqueue: boot-activemq-queue
消费者代码
@Component
public class Queue_Consume {
//监听过后会随着springboot一起启动,有消息就执行加了该注解的方法
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws Exception{
System.out.println("*******消费者收到消息: "+textMessage.getText());
}
}
添加功能,实现每隔3秒钟,往MQ推送消息
生产者代码添加以下
//间隔时间3秒钟定投
//直接启动主启动类即可
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled(){
jmsMessagingTemplate.convertAndSend(queue,"******Scheduled: "+ UUID.randomUUID().toString().substring(0,6));
System.out.println("*******Scheduled send ok 3s");
}
主启动类添加如下注释
//是否开启定时任务调度功能
@EnableScheduling
3、Topic队列模式
生产者
yml文件
server:
port: 8888
spring:
activemq:
# activemq的broker的url
broker-url: tcp://192.168.16.106:61616
# 连接activemq的broker所需的账号和密码
user: admin
password: admin
jms:
# 目的地是queue还是topic, false(默认) = queue true = topic
pub-sub-domain: true
# 自定义队列名称。这只是个常量
mytopic: boot-activemq-topic
configBean类,返回ActiveMQTopic
@Component
public class ConfigBean {
@Value("${mytopic}")
private String mytopic;
@Bean
public Topic topic(){
return new ActiveMQTopic(mytopic);
}
}
生产者代码
@Component
public class Topic_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
public void produceMsg(){
jmsMessagingTemplate.convertAndSend(topic,"******: "+ UUID.randomUUID().toString().substring(0,6));
}
}
消费者
pom文件
server:
port: 7766 #启动第二个消费者将端口改为7766
spring:
activemq:
# activemq的broker的url
broker-url: tcp://192.168.16.106:61616
# 连接activemq的broker所需的账号和密码
user: admin
password: admin
jms:
# 目的地是queue还是topic, false(默认) = queue true = topic
pub-sub-domain: true
# 自定义队列名称。这只是个常量
mytopic: boot-activemq-topic
消费者代码(非持久化订阅)
@Component
public class Topic_Consume {
@JmsListener(destination = "${mytopic}")
public void receive(TextMessage textMessage) throws Exception{
System.out.println("*******消费者收到消息: "+textMessage.getText());
}
}
配置Bean(持久化订阅)
配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅
@Component
@EnableJms
public class ActiveMQConfigBean {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String user;
@Value("${spring.activemq.password}")
private String password;
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(user);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean(name = "jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory());
defaultJmsListenerContainerFactory.setSubscriptionDurable(true);
defaultJmsListenerContainerFactory.setClientId("我是持久订阅者一号");
return defaultJmsListenerContainerFactory;
}
}
消费者代码
@Component
public class Topic_Consumer {
//需要在监听方法指定连接工厂
@JmsListener(destination = "${mytopic}",containerFactory = "jmsListenerContainerFactory")
public void consumer(TextMessage textMessage) throws JMSException {
System.out.println("*******消费者收到消息" + textMessage.getText());
}
}
五、消息存储和持久化
1、是什么
为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。
ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。
消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。
简单来说,就是ActiveMQ宕机了,消息不会丢失的机制
2、有哪些方式
1)AMQ Mesage Store(了解)
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本
2)KahaDB消息存储(默认)
基于日志文件,从ActiveMQ5.4开始默认的持久化插件
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
存储原理
(1)db-number.log
KahaDB存储消息到预定大小的数据纪录文件中,文件名为db-number.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.log,db-2.log······。当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。
(2)db.data
该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-number。log里面存储消息。
(3)db.free
当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID,在后面建索引的时候从空闲页开始,保证索引的连续性,减少空闲碎片。
(4)db.redo
用来进行消息恢复,如果KahaDB消息存储再强制退出后启动,用于恢复BTree索引。
(5)lock
文件锁,表示当前kahadb独写权限的broker。
3)JDBC消息存储
实现方式:
1. 添加mysql数据库的驱动包到lib文件夹
mysql-connector-java-5.1.49.jar
2. 修改activemq.xml配置文件
dataSource是指定将要引用的持久化数据库的bean名称。
createTableOnStartup是否在启动的时候创建数据库表,默认是true,这样每次启
动都会去创建表了,一般是第一次启动的时候设置为true,然后再去改成false。
<jdbcPersistenceAdapter dataSource="#mysql-ds createTableOnStartup=false"/>
本地数据库要设置所有的ip都可以连接
也可以使用其他的数据库驱动,但是要在lib目录下引入相应的包
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://主机ip:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="12345"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
3. 配置成功的话,本地数据库mysql-ds下会有三张表
消息表:ACTIVEMQ_MSGS
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高
存储订阅关系:ACTIVEMQ_ACKS
记录哪个Broker是当前的Master Broker:ACTIVEMQ_LOCK
表在集群环境下才有用,只有一个Broker可以获取消息,称为Master Broker,其他的只能作为备份,等待Master Broker不可用,才可能成为下一个Master Broker。
4. 代码验证
注意:一定要开启持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
(1)queue队列模式
生产者
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue= session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
//持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
for(int i=0;i<3;i++){
TextMessage textMessage = session.createTextMessage("jdbc msg---" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("---jdbc消息发布到mq---");
}
}
消费者
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到text消息:" + (textMessage.getText()));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
(2)topic队列模式
生产者
public class JmsProduce {
private static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
private static final String ACTIVEMQ_TOPIC_NAME = "Topic-JdbcPersistence";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 0; i<3;i++){
TextMessage textMessage = session.createTextMessage("Topic-JdbcPersistence测试消息" + i);
messageProducer.send(textMessage);
}
System.out.println("主题发送到MQ完成");
messageProducer.close();
session.close();
connection.close();
}
}
消费者
public class JmsConsumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
private static final String ACTIVEMQ_TOPIC_NAME = "Topic-JdbcPersistence";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("消费者1号");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME);
TopicSubscriber subscriber = session.createDurableSubscriber(topic,"mq-jdbc");
connection.start();
subscriber.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者收到的消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
}
}
5. 结论
queue队列模式:
- 在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者消费了,就会删除消费过的消息
topic队列模式:
- 一般是先启动消费订阅者然后再生产的情况下会将持久订阅者永久保存到qctivemq_acks,而消息则永久保存在activemq_msgs,
- 在acks表中的订阅者有一个last_ack_id对应了activemq_msgs中的id字段,这样就知道订阅者最后收到的消息是哪一条。
4)JDBC Message store with ActiveMQ Journal
这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。
ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:
生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
配置
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="5"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data" />
</persistenceFactory>
总结
以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用
5)LevelDB消息存储(了解)
这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。
但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引
3、总结
ActiveMQ消息持久化机制有:
AMQ | 基于日志文件 |
KahaDB | 基于日志文件,从ActiveMQ5.4开始默认使用 |
JDBC | 基于第三方数据库 |
Replicated LevelDB Store | 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。 |
六、高级特性
1、引入消息队列之后该如何保证其高可用性
zookeeper+Replicated LevelDB
2、异步投递Async Sends
1)是什么
ActiveMQ支持同步,异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎么样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著提高发送的性能。
ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer,直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
2)特点
它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,
就是需要消耗更多的Client端内存同时也会导致broker端性能消耗增加;
此外它不能有效的确保消息的发送成功。在userAsyncSend=true的情况下客户端需要容忍消息丢失的可能。
3)如何确保发送成功
异步发送丢失消息的场景是:生产者设置userAsyncSend=true,使用producer.send(msg)持续发送消息。
如果消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。
如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。
所以,正确的异步发送方法是需要接收回调的。
同步发送和异步发送的区别就在此,同步发送等send不阻塞了就表示一定发送成功了,异步发送需要客户端回执并由客户端再判断一次是否发送成功。
4)代码实现
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue_cluster";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//设置异步发送
factory.setUseAsyncSend(true);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
//创建消息的生产者
ActiveMQMessageProducer messageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
//通过使用messageProducer生产消息发送到MQ的队列
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("message---" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"------orderAtguigu");
String msgId = textMessage.getJMSMessageID();
//接收回调,判断消息是否发送成功
messageProducer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println(msgId + "已经发送成功");
}
@Override
public void onException(JMSException exception) {
System.out.println(msgId + "发送失败");
}
});
}
messageProducer.close();
session.close();
connection.close();
System.out.println("---消息发布到mq---");
}
}
3、延迟投递和定时投递
1)配置
broker标签添加scheduleSupport属性
<broker schedulerSupport = "true">
2)代码实现
public class JmsProdece {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue_delay";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
//延迟投递时间
long delay = 3 * 1000;
//重复投递间隔时间
long period = 4 *1000;
//重复次数
int repeat = 5;
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("message---" + i);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("---消息发布到mq---");
}
}
4、ActiveMQ消息重试机制
1)哪些情况会引发消息重发
- 消费者用了事务且在session中调用了rollback
- 消费者用了事务且在调用commit之前关闭或者没有commit
- 消费者在CLIENT_ACKNOWLEDGE(需要手动签收)的传递模式下,session中调用了recover进行重试。
2)消息重发时间间隔和重发次数
间隔:1
次数:6
3)有毒消息Poison ACK
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。
4)代码
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
public static final String QUEUE_NAME = "queue_repeat";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//重发策略
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
//重发次数最大为3
redeliveryPolicy.setMaximumRedeliveries(3);
factory.setRedeliveryPolicy(redeliveryPolicy);
Connection connection = factory.createConnection();
connection.start();
//使用事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage){
TextMessage testMessage = (TextMessage) message;
try {
System.out.println("收到的消息:"+testMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
//session.commit(); 重复消费
messageConsumer.close();
session.close();
connection.close();
}
}
5、如果保证消息不被重复消费呢?幂等性问题
1.如果消息是做数据库插入操作,给这个消息做一个唯一主键,就算出现重复消费的情况,会导致主键冲突,避免数据库出现脏数据。
2.准备第三方服务方做消费记录,以redis为例,给消息分配一个全局id,只要消费过这个消息,将<id,message>以K-V形式写入redis,那消费者开始消费前,先去redis中查询消费记录。