RabbitMQ延时队列实现
1.1 消息的TTL
消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒 RabbitMQ 可以对队列和消息分别设置TTL
对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就是死了,称之为死信
如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration 字段或者 x-message-ttl 属性来设置时间,两者是一样的效果
1.2 死信:Dead Letter Exchange(DLX)
一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列
1 消息被拒绝,并且设置 requeue 参数为 false
2 上面的消息的TTL到了,消息过期了,而下面的例子就是使用ttl这个条件来实现延时队列
3 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由。
DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。当这个队列存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。要为某个队列添加 DLX,需要在创建这个队列的时候设置其deadLetterExchange 和 deadLetterRoutingKey 参数,deadLetterRoutingKey 参数可选,表示为 DLX 指定的路由键,如果没有特殊指定,则使用原队列的路由键,所以当队列 myFirstQueue 中有消息成为死信后就会被发布到 myDeadLetterEx 中去python代码实现
import pika, json, time, threading
class RabbitMQClient(object):
# 加锁,防止并发较高时,同时创建对象,导致创建多个对象
_singleton_lock = threading.Lock()
def __init__(self, conn_str='amqp://user:password@ip:5672/%2F'):
"""__new__创建对象后初始化对象"""
self.exchange_type = "direct"
self.connection = pika.BlockingConnection(pika.URLParameters(conn_str))
self.channel = self.connection.channel()
self._declare_retry_queue()
def __new__(cls):
"""__new__用来创建对象"""
if not hasattr(RabbitMQClient, "_instance"):
with RabbitMQClient._singleton_lock:
if not hasattr(RabbitMQClient, "_instance"):
RabbitMQClient._instance = super().__new__(cls)
return RabbitMQClient._instance
def close_connection(self):
self.connection.close()
def declare_exchange(self, exchange):
"""创建交换器
durable:交换器持久化
"""
self.channel.exchange_declare(exchange=exchange,exchange_type=self.exchange_type,durable=True)
def declare_queue(self, queue):
"""创建队列
durable: 队列持久化
"""
self.channel.queue_declare(queue=queue,durable=True)
def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
"""
创建延迟队列
:param TTL: ttl的单位是us,ttl=60000 表示 60s
:param queue:
:param DLX:死信转发的exchange
:return:
"""
arguments={}
if DLX:
#设置死信转发的exchange
arguments[ 'x-dead-letter-exchange']=DLX
if TTL:
arguments['x-message-ttl']=TTL # 消息时间
print(arguments)
self.channel.queue_declare(queue=queue,durable=True,arguments=arguments)
def _declare_retry_queue(self):
"""
创建异常交换器和队列,用于存放没有正常处理的消息。
:return:
"""
self.channel.exchange_declare(exchange='RetryExchange',
exchange_type='fanout',
durable=True)
self.channel.queue_declare(queue='RetryQueue', durable=True)
self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')
def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
"""
发送消息到指定的交换器
:param exchange: RabbitMQ交换器
:param msg: 消息实体,是一个序列化的JSON字符串
:return:
"""
if delay==0:
self.declare_queue(routing_key)
else:
self.declare_delay_queue(routing_key,TTL=TTL)
if exchange!='':
self.declare_exchange(exchange)
self.channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2,
type=exchange
))
self.close_connection()
print("message send out to %s" % exchange)
def start_consume(self,callback,queue='#',delay=1):
"""
启动消费者,开始消费RabbitMQ中的消息
:return:
"""
if delay==1:
queue='RetryQueue'
else:
self.declare_queue(queue)
self.channel.basic_qos(prefetch_count=10) # 队列长度10
try:
self.channel.basic_consume( # 消费消息
queue, # 你要从那个队列里收消息
callback, # 如果收到消息,就调用callback函数来处理消息
)
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop_consuming()
def stop_consuming(self):
"""关闭"""
self.channel.stop_consuming() # 消费者停止监听队列
self.close_connection() # 断开连接
def message_handle_successfully(channel, method):
"""
如果消息处理正常完成,必须调用此方法,
否则RabbitMQ会认为消息处理不成功,重新将消息放回待执行队列中
:param channel: 回调函数的channel参数
:param method: 回调函数的method参数
:return:
"""
channel.basic_ack(delivery_tag=method.delivery_tag)
def message_handle_failed(channel, method):
"""
如果消息处理失败,应该调用此方法,会自动将消息放入异常队列
:param channel: 回调函数的channel参数
:param method: 回调函数的method参数
:return:
"""
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
if __name__ == '__main__':
# 发布者代码
print("start program")
client = RabbitMQClient()
msg1 = '{"key":"value"}'
client.publish_message('test-test', msg1, delay=1, TTL=60000)
print("message send out")
time.sleep(3)
print("start program")
client = RabbitMQClient()
def callback(ch, method, properties, body):
msg = body.decode()
print(msg)
# 如果处理成功,则调用此消息回复ack,表示消息成功处理完成。
RabbitMQClient.message_handle_successfully(ch, method)
queue_name = "RetryQueue"
client.start_consume(callback, queue_name, delay=0)