rabbitmq在python中实现延时队列(库存回滚)

  1. RabbitMQ延时队列实现
    1.1 消息的TTL
    消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒 RabbitMQ 可以对队列消息分别设置TTL
    对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就是死了,称之为死信
    如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration 字段或者 x-message-ttl 属性来设置时间,两者是一样的效果
    1.2 死信:Dead Letter Exchange(DLX)
    一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列
    1 消息被拒绝,并且设置 requeue 参数为 false
    2 上面的消息的TTL到了,消息过期了,而下面的例子就是使用ttl这个条件来实现延时队列
    3 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由。
    DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。当这个队列存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。要为某个队列添加 DLX,需要在创建这个队列的时候设置其deadLetterExchange 和 deadLetterRoutingKey 参数,deadLetterRoutingKey 参数可选,表示为 DLX 指定的路由键,如果没有特殊指定,则使用原队列的路由键,所以当队列 myFirstQueue 中有消息成为死信后就会被发布到 myDeadLetterEx 中去

  2. python代码实现

import pika, json, time, threading


class RabbitMQClient(object):
    # 加锁,防止并发较高时,同时创建对象,导致创建多个对象
    _singleton_lock = threading.Lock()

    def __init__(self, conn_str='amqp://user:password@ip:5672/%2F'):
        """__new__创建对象后初始化对象"""
        self.exchange_type = "direct"
        self.connection = pika.BlockingConnection(pika.URLParameters(conn_str))
        self.channel = self.connection.channel()
        self._declare_retry_queue()

    def __new__(cls):
        """__new__用来创建对象"""
        if not hasattr(RabbitMQClient, "_instance"):
            with RabbitMQClient._singleton_lock:
                if not hasattr(RabbitMQClient, "_instance"):
                    RabbitMQClient._instance = super().__new__(cls)
        return RabbitMQClient._instance

    def close_connection(self):
        self.connection.close()

    def declare_exchange(self, exchange):
        """创建交换器
        durable:交换器持久化
        """

        self.channel.exchange_declare(exchange=exchange,exchange_type=self.exchange_type,durable=True)

    def declare_queue(self, queue):
        """创建队列
        durable: 队列持久化
        """

        self.channel.queue_declare(queue=queue,durable=True)

    def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
        """
        创建延迟队列
        :param TTL: ttl的单位是us,ttl=60000 表示 60s
        :param queue:
        :param DLX:死信转发的exchange
        :return:
        """
        arguments={}
        if DLX:
            #设置死信转发的exchange
            arguments[ 'x-dead-letter-exchange']=DLX
        if TTL:
            arguments['x-message-ttl']=TTL # 消息时间
        print(arguments)
        self.channel.queue_declare(queue=queue,durable=True,arguments=arguments)
        
    def _declare_retry_queue(self):
        """
        创建异常交换器和队列,用于存放没有正常处理的消息。
        :return:
        """
        self.channel.exchange_declare(exchange='RetryExchange',
                                      exchange_type='fanout',
                                      durable=True)
        self.channel.queue_declare(queue='RetryQueue', durable=True)
        self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')

    def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
        """
        发送消息到指定的交换器
        :param exchange: RabbitMQ交换器
        :param msg: 消息实体,是一个序列化的JSON字符串
        :return:
        """
        if delay==0:
            self.declare_queue(routing_key)
        else:
            self.declare_delay_queue(routing_key,TTL=TTL)
        if exchange!='':
            self.declare_exchange(exchange)
        self.channel.basic_publish(exchange=exchange,
                                   routing_key=routing_key,
                                   body=msg,
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,
                                       type=exchange
                                   ))
        self.close_connection()
        print("message send out to %s" % exchange)

    def start_consume(self,callback,queue='#',delay=1):
        """
        启动消费者,开始消费RabbitMQ中的消息
        :return:
        """
        if delay==1:
            queue='RetryQueue'
        else:
            self.declare_queue(queue)
        self.channel.basic_qos(prefetch_count=10) # 队列长度10
        try:
            self.channel.basic_consume(  # 消费消息
                queue,  # 你要从那个队列里收消息
                callback,  # 如果收到消息,就调用callback函数来处理消息
            )
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop_consuming()

    def stop_consuming(self):
        """关闭"""
        self.channel.stop_consuming() # 消费者停止监听队列
        self.close_connection()  # 断开连接

    def message_handle_successfully(channel, method):
        """
        如果消息处理正常完成,必须调用此方法,
        否则RabbitMQ会认为消息处理不成功,重新将消息放回待执行队列中
        :param channel: 回调函数的channel参数
        :param method: 回调函数的method参数
        :return:
        """
        channel.basic_ack(delivery_tag=method.delivery_tag)

    def message_handle_failed(channel, method):
        """
        如果消息处理失败,应该调用此方法,会自动将消息放入异常队列
        :param channel: 回调函数的channel参数
        :param method: 回调函数的method参数
        :return:
        """
        channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)



if __name__ == '__main__':

    # 发布者代码
    print("start program")
    client = RabbitMQClient()
    msg1 = '{"key":"value"}'
    client.publish_message('test-test', msg1, delay=1, TTL=60000)
    print("message send out")

    time.sleep(3)
    print("start program")
    client = RabbitMQClient()
    def callback(ch, method, properties, body):
        msg = body.decode()
        print(msg)
        # 如果处理成功,则调用此消息回复ack,表示消息成功处理完成。
        RabbitMQClient.message_handle_successfully(ch, method)

    queue_name = "RetryQueue"
    client.start_consume(callback, queue_name, delay=0)

版权声明:本文为qq_42874635原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。