一 RibbitMQ
RabbitMQ 是一个开源的 AMQP(高级消息队列协议) 实现,服务器端用Erlang语言编写.
相关概念
持久化
RabbitMQ的持久化是在消费确认前的持久,在消费确认后还是会删除。
虚拟主机
一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。
交换机
Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
- Direct:(默认)“先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.可以多个队列绑定同一个交换机,但是在同一时间只有一个队列能拿到消息。
- Topic:按规则转发消息(最灵活)。根据通配符来转发,其他和direct类似。*可以匹配句点之间的字符,#可以匹配任意字符,包括句点。
- Headers:设置 header attribute 参数类型的交换机,根据消息中的headers属性和队列与交换机绑定时候设定的键值对规则匹配,x-match参数可以有any和all,any是只要有一组键值对匹配上就会发送消息到相应队列,all则需所有键值对都匹配上才可以。
- Fanout:转发消息到所有绑定队列,不管所有规则,只要绑定了就会发送。
注
headers表越大性能越差
绑定
也就是交换机需要和队列相绑定,是多对多的关系。
Default Exchange
rabbitTemplate.convertAndSend(“hello rabbit”);这种没有路由健的消息会丢失,不会给到任何queue --> mandatory
rabbitTemplate.convertAndSend(“simpleQueue”, “hello simple”); 指定了routing key 发送给指定的queue
rabbitTemplate.convertAndSend(“testQueue”, “hello testQueue”); 如果同一个queue有多个消费者 会按prefetch均匀分配
properties属性

消息发布的性能权衡
- 平衡投递速度和可靠投递
- 使用mandatory设置,正确投递不会有相应,错误投递会抛异常。(待确认)
- 发布者确认,不与事务一起工作,是事务的轻量化代替方案
- 备用交换器,错误的消息会被你设置的路由到死信队列(dead-letter)如果此时设置了mandatory标志也不会给发布者返回信息,而是被认为正确投递。exhange的alternate-exchange
- HA队列(集群模式)来避免单点故障
- delivery-mode=2来持久化消息,性能影响比较大;可以增加RAM,以便将磁盘页面保留在内核磁盘缓存中。
- TCP背压和连接阻塞,阻止单点发布者的恶意请求或者异常(太多)请求
消息接收的性能权衡
- 尽量不使用主动拉取,而是使用监听
- 使用no-ack会提高性能,却会丢失消息,权衡点:prefetch的大小
- QoS,服务质量,调整预取值到合适
- 一次确认多条消息性能比单条提升1.5倍(在预取量达到1000及以上)
- 事务,不适用于金庸确认的消费者,影响性能不过在不设置qos时,使用事务批量确认消息时性能有略微提升
- reject-拒绝单条消息可以设置redelivered重新投递或者丢弃,Nack批量不确认
- 死信交换器,过期或者被拒绝的消息,当作queue的x-dead-letter-exchange参数传入即可
- 自动删除队列auto-delete=true,单消费者队列exclusivetrue,自动过期队列x-expires=毫秒
队列的参数设置
消费顺序
为了保证消费顺序,rabbitmq可以使用queue和consumer一一对应的关联,虽然queue多了一点但是利用了queue的队列达到了消息的顺序行,在consumer内部也可以维护一个队列来保证顺序。–这样影响了性能
RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:
// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的肯定是同一个队列。
private SendResult send() {
// 获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根据我们的算法,选择一个发送队列
// 这里的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}
二 ActiveMQ
消费顺序
- 利用Activemq的高级特性:consumer之独有消费者(exclusive consumer) 即独占消息在ActiveMQ4.x中可以采用Exclusive Consumer,broker会从queue中,一次发送消息给一个消费者,这样就避免了多个消费者并发消费的问题,从而保证顺序
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
- 利用Activemq的高级特性:messageGroups 相同GroupId的消息有先发给同一个消费者 达到了顺序和并行的兼容(同一组消息的顺序,不同组的并行)
bytesMessage.setStringProperty("JMSXGroupID", "constact-20100000002");
bytesMessage.setIntProperty("JMSXGroupSeq", -1);
JmsTemplate 类
这个类有一个@Nullable private Object defaultDestination; 存放defaultDestination或者defaultDestinationName
有两套getset方法,一套是Destination一套是String。
@Nullable private MessageConverter messageConverter; 存放messageConverter。
无参构造函数
public JmsTemplate() {
this.transactionalResourceFactory = new JmsTemplate.JmsTemplateResourceFactory();
this.messageIdEnabled = true;
this.messageTimestampEnabled = true;
this.pubSubNoLocal = false;
this.receiveTimeout = 0L;
this.deliveryDelay = -1L;
this.explicitQosEnabled = false;
this.deliveryMode = 2;
this.priority = 4;
this.timeToLive = 0L;
this.initDefaultStrategies(); //this.setMessageConverter(new SimpleMessageConverter());
}
send方法 JmsTemplate.doSend->ActiveMQMessageProducer.send->ActiveMQSession.send->ActiveMQConnection.syncSendPacket
protected void doSend(Session session, Destination destination, MessageCreator messageCreator) throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
MessageProducer producer = this.createProducer(session, destination);
try {
Message message = messageCreator.createMessage(session);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Sending created message: " + message);
}
this.doSend(producer, message);
if (session.getTransacted() && this.isSessionLocallyTransacted(session)) {
JmsUtils.commitIfNecessary(session);
}
} finally {
JmsUtils.closeMessageProducer(producer);
}
}
receice方法
@Nullable
protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
Message var8;
try {
long timeout = this.getReceiveTimeout();
ConnectionFactory connectionFactory = this.getConnectionFactory();
JmsResourceHolder resourceHolder = null;
if (connectionFactory != null) {
resourceHolder = (JmsResourceHolder)TransactionSynchronizationManager.getResource(connectionFactory);
}
if (resourceHolder != null && resourceHolder.hasTimeout()) {
timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
}
Message message = this.receiveFromConsumer(consumer, timeout);
if (session.getTransacted()) {
if (this.isSessionLocallyTransacted(session)) {
JmsUtils.commitIfNecessary(session);
}
} else if (this.isClientAcknowledge(session) && message != null) {
message.acknowledge(); //这就是为什么你设置了手动确认还是会自动ack消息的原因
}
var8 = message;
} finally {
JmsUtils.closeMessageConsumer(consumer);
}
return var8;
}
------------------receive方法(receiveFromConsumer的实现)------------
public Message receive(long timeout) throws JMSException {
this.checkClosed(); //确认消费者通道没有关闭
this.checkMessageListener(); //确保没有已经存在的监听者Cannot synchronously receive a message when a MessageListener is set
if (timeout == 0L) {
return this.receive();
} else {
this.sendPullCommand(timeout);
if (timeout > 0L) {
MessageDispatch md;
if (this.info.getPrefetchSize() == 0) {
md = this.dequeue(-1L);
} else {
md = this.dequeue(timeout);
}
if (md == null) {
return null;
} else {
this.beforeMessageIsConsumed(md);
this.afterMessageIsConsumed(md, false);
return this.createActiveMQMessage(md);
}
} else {
return null;
}
}
}
pub-sub-domain: true
这句配置设置为true时 只有topic起作用, false时候只有queue起作用, 默认false
想要queue和topic同时起作用,需要配置两个监听工厂,一个setPubSubDomain(false),一个setPubSubDomain(true)
acknowledge-mode:
dups_ok: 批量确认
auto:自动确认
client: 客户端手动确认
question0:
在client模式下,我没有手动确认消息,为什么服务端还是出队了
answer0:
当我们关系事务,并且设置接受模式为CLIENT_ACKNOWLEDGE,发现并没有起作用。原因如下(源码自动转为自动确认了):
//org.springframework.jms.listener.AbstractMessageListenerContainer
protected void commitIfNecessary(Session session, @Nullable Message message) throws JMSException {
if (session.getTransacted()) {
if (this.isSessionLocallyTransacted(session)) {
JmsUtils.commitIfNecessary(session);
}
} else if (message != null && this.isClientAcknowledge(session)) {
message.acknowledge();
}
}
目前可以使用一下配置达到手动确认的目的:
@Bean
public JmsListenerContainerFactory<?> ackQueueListener(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setSessionTransacted(false);
factory.setSessionAcknowledgeMode(4);
factory.setConnectionFactory(connectionFactory);
return factory;
}
因为程序没有异常,在方法执行完后还是会自动ack,我们显示抛出一个异常就可以了。重试几次默认是6后会加入ActiveMQ.DLQ。
客户端成功接收一条消息的标志是一条消息被签收,成功应答。
消息的签收情形分两种:
1、带事务的session
如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。
2、不带事务的session
不带事务的session的签收方式,取决于session的配置。
Activemq支持一下三种模式:
Session.AUTO_ACKNOWLEDGE 消息自动签收
Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。
question1:
两个消费者时平分消息数量 一个延迟一秒处理结束 一个无延迟处理结束 100条消息一人处理50条 一个结束了另一个慢慢处理
answer1:
Broker发送消息给消费者,消费者在处理结束后会发送ack反馈给broker。为了提高消息分发的效率,引入了预取策略。即,Broker在未得到消费者的ack反馈之前,会继续发送新消息给它,除非消费者的消息缓存区已满,或是未收到反馈的消息数达到了prefetch上限。
需要注意的是,消息被prefetch后,仍然会在ActiveMQ的控制台里处于Pending状态——直到它被实际消费,Broker收到了反馈,才会认为其Dequeued.
明白了这个道理,就知道了上面提到的“消息似乎被预先指定给各个消费者”的原因了——因为消息都被prefetch了。这个时候,即便有新的消费者加入,它也没办法处理别人已经prefetch的消息
ActiveMQ生产者和消费者优化策略
生产者
默认情况下,ActiveMQ服务端认为生产者端发送的是PERSISTENT Message。所以如果要发送NON_PERSISTENT Message,那么生产者端就要明确指定。发送NON_PERSISTENT Message时,消息发送方默认使用异步方式:即是说消息发送后发送方不会等待NON_PERSISTENT Message在服务端的任何回执。为避免MQ消息堆积但发送方不知道无法采取策略的情况,消息发送者会在发送一定大小的消息后等待服务端进行回执,可以通过代码设置回执点或者设置每次都等待服务端回执。connectionFactory.setProducerWindowSize(102400); 设置消息发送者在累计发送102400byte大小的消息后(可能是一条消息也可能是多条消息)
等待服务端进行回执,以便确定之前发送的消息是否被正确处理,确定服务器端是否产生了过量的消息堆积,需要减慢消息生产端的生产速度。
如果您不特意指定消息的发送类型,那么消息生产者默认发送PERSISTENT Meaage。这样的消息发送到ActiveMQ服务端后将被进行持久化存储(比较耗时),并且消息发送者默认等待ActiveMQ服务端对这条消息处理情况的回执。为了提高ActiveMQ在接受PERSISTENT Meaage时的性能,ActiveMQ允许开发人员遵从JMS API中的设置方式,为消息发送端在发送PERSISTENT Meaage时提供异步方式,connectionFactory.setUseAsyncSend(true);此时要设置回执点。
JMS规范中支持带事务的消息,也就是说您可以启动一个事务(并由消息发送者的连接会话设置一个事务号Transaction ID),然后在事务中发送多条消息。这个事务提交前这些消息都不会进入队列(无论是Queue还是Topic)。
生产流控制,是ActiveMQ消息生产者端最为重要的性能策略,它主要设定了在ActiveMQ服务节点在产生消息堆积,并超过限制大小的情况下,如何进行消息生产者端的限流。在ActiveMQ的主配置文件activemq.xml中,关于ProducerFlowControl策略的控制标签是“destinationPolicy”和它的子标签,可以配置每个队列是否启用生产者流控,以及每个Queue的最大内存限制。有关于policyEntry标签的所有配置选项都有完整说明:http://activemq.apache.org/per-destination-policies.html。
spring.activemq.template.delivery-mode: persistent/non_persistent 持久化存储/非持久化存储
发送NON_PERSISTENT Message时,消息发送方默认使用异步方式:即是说消息发送后发送方不会等待NON_PERSISTENT Message在服务端的任何回执
这时候最好设置回值大小,即在消息大小达到设定值时,服务端返回一个回执。connectionFactory.setProducerWindowSize(102400);
如果在异步情况下,需要每次都回执connectionFactory.setAlwaysSyncSend(true);
如果您不特意指定消息的发送类型,那么消息生产者默认发送PERSISTENT Meaage。这样的消息发送到ActiveMQ服务端后将被进行持久化存储(持久化存储方案将在后文进行详细介绍),并且消息发送者默认等待ActiveMQ服务端对这条消息处理情况的回执。
以上这个过程非常耗时,ActiveMQ服务端不但要接受消息,在内存中完成存储,并且按照ActiveMQ服务端设置的持久化存储方案对消息进行存储(主要的处理时间耗费在这里)。为了提高ActiveMQ在接受PERSISTENT Meaage时的性能,ActiveMQ允许开发人员遵从JMS API中的设置方式,为消息发送端在发送PERSISTENT Meaage时提供异步方式:
connectionFactory.setUseAsyncSend(true); 一旦这么做了就必须设置回执窗口 connectionFactory.setProducerWindowSize(102400);
消费者
比起消息生产者来说消息消费者的性能更能影响ActiveMQ系统的整体性能,因为要成功完成一条消息的处理,它的工作要远远多于消息生产者。默认情况下ActiveMQ服务端采用异步方式向客户端推送消息。也就是说ActiveMQ服务端在向某个消费者会话推送消息后,不会等待消费者的响应信息,直到消费者处理完消息后,主动向服务端返回处理结果。
ActiveMQ系统中,默认的策略是ActiveMQ服务端一旦有消息,就主动按照设置的规则推送给当前活动的消费者。其中每次推送都有一定的数量限制,这个限制值就是prefetchSize。针对Queue工作模型的队列和Topic工作模型的队列,ActiveMQ有不同的默认“预取数量”;针对NON_PERSISTENT Message和PERSISTENT Message,ActiveMQ也有不同的默认“预取数量”:
PERSISTENT Message—Queue:prefetchSize=1000
NON_PERSISTENT Message—Queue:prefetchSize=1000
PERSISTENT Message—Topic:prefetchSize=100
NON_PERSISTENT Message—Topic:prefetchSize=32766
ActiveMQ中设置的各种默认预取数量一般情况下不需要进行改变。但是非必要情况下,请不要设置prefetchSize=1,因为这样就是一条一条的取数据;也不要设置为prefetchSize=0,因为这将导致关闭服务器端的推送机制,改为客户端主动请求。
JMS规范除了为消息生产者端提供事务支持以外,还为消费服务端准备了事务的支持。您可以通过在消费者端操作事务的commit和rollback方法,向服务器告知一组消息是否处理完成。采用事务的意义在于,一组消息要么被全部处理并确认成功,要么全部被回滚并重新处理。
如果一条消息被不断的处理失败,那么最可能的情况就是这条消息承载的业务内容本身就有问题。那么无论重发多少次,这条消息还是会处理失败。为了解决这个问题,ActiveMQ中引入了“死信队列”(Dead Letter Queue)的概念。即一条消息再被重发了多次后(默认为重发6次redeliveryCounter==6),将会被ActiveMQ移入“死信队列”。开发人员可以在这个Queue中查看处理出错的消息,进行人工干预。默认情况下“死信队列”只接受PERSISTENT Message,如果NON_PERSISTENT Message超过了重发上限,将直接被删除。
jmsTemplate
jmsTemplate.sendAndReceive:阻塞接收来自消费者的消息
消息类型
JMS API 定义了5种消息体格式,也叫消息类型,可以使用不同形式发送接收数据并可以兼容现有的消息格式,下面描述这5种类型:
- TextMessage:java.lang.String对象,如xml文件内容。
- MapMessage:key/value键值对的集合,key是String对象,值类型可以是Java任何基本类型。
- BytesMessage:字节流。
- StreamMessage:Java 中的输入输出流。
- ObjectMessage:Java中的可序列化对象。
另外,还有一种是Message,没有消息体,只有消息头和属性。
- 对应的object必须实现Serializable接口。
- 必须配置 spring.active.packages.trust-all: true;或者设置连接工厂的
- 接收端使用如下方法接受
if (message instanceof ActiveMQObjectMessage) { ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) message; Car car = (Car) objectMessage.getObject(); Gson gson = new Gson(); log.info("i have a car!!!!! --->{}", gson.toJson(car)); }
- MapMessage:Only objectified primitive objects, String, Map and List types are allowed
- BytesMessage.writeBytes(byte b)
- StreamMesage携带了一个Java原始数据类型流作为有效负载。它提供了一套将格式化字节流映射为Java原始数据类型的简便方法。
StreamMessage保持了写入流时的顺序和原始数据类型,因此它适用于形式转换规则。
对比
