RabbitMQ客户端开发

github
RabbitMQ Tutorials
rabbitmq-dotnet-client
参考《RabbitMQ实战指南》

连接RabbitMQ

通常情况下,在调用CreateConnection或者CreateModel 方法之后,可以简单地认为Connection或者Channel已经成功地处于开启状态。可以捕获IOExceptio、SocketException、TimeoutException获取创建Connection异常。

using RabbitMQ.Client;
using System;

namespace Utils
{
    public class MQClientConnUtils
    {
        /// <summary>
        /// 主机
        /// </summary>
        private static readonly String HOST = "127.0.0.1";
        
        /// <summary>
        /// RabbitMQ(AMQP) 服务端默认端口号为 5672
        /// </summary>
        private static readonly int PORT = 5672;

        /// <summary>
        /// 虚拟主机
        /// </summary>
        private static readonly String VIRTUALHOST = "/";

        /// <summary>
        /// 用户名
        /// </summary>
        private static readonly String USERNAME = "guest";

        /// <summary>
        /// 密码
        /// </summary>
        private static readonly String PASSWORD = "guest";   
        
        /// <summary>
        /// 获取RabbitMQ Connection连接
        /// </summary>
        /// <returns></returns>
        public static IConnection GetConnection() 
        {
            //配置连接工厂
            ConnectionFactory connectionFactory = new()
            {
                HostName = HOST,
                Port = PORT,

                //如果配置有用户名密码以及vhost,则配置即可。
                UserName = USERNAME,
                Password = PASSWORD,
                VirtualHost = VIRTUALHOST,
            };

            //连接工厂创建连接
            return connectionFactory.CreateConnection();
        }   
        
        /// <summary>
        /// 获取RabbitMQ Connection连接
        /// </summary>
        /// <param name="host"></param>
        /// <param name="port"></param>
        /// <param name="virtualHost"></param>
        /// <param name="username"></param>
        /// <param name="password"></param>
        /// <returns></returns>
        public static IConnection GetConnection(String host, int port, String virtualHost, String username, String password)
        {
            //配置连接工厂
            ConnectionFactory connectionFactory = new()
            {
                HostName = host,
                Port = port,

                //如果配置有用户名密码以及vhost,则配置即可。
                VirtualHost = virtualHost,
                UserName = username,
                Password = password,
            };

            //连接工厂创建连接
            return connectionFactory.CreateConnection();
        }

        /// <summary>
        /// 关闭RabbitMQ连接
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="connection"></param>
        public static void Close(IModel channel, IConnection connection) 
        {
            if (channel != null) channel.Close();

            if (connection != null) connection.Close();
        }
    }
}

创建通道

using (IModel channel = connection.CreateModel())

using (var channel = connection.CreateModel())
Connection可以用来创建多个Channel实例,但是Channel实例不能在线程问共享,应用程序应该为每一个线程开辟一个Channel。

使用交换机和队列

void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);

声明交换器exchangeDeclare常用参数说明如下:

  • exchange: 交换器的名称。
  • type: 交换器的类型,常见的如 fanout direct topic 参考:交换机类型
  • durable: 设置是否持久durable设置为true表示持久化,反之是非持久。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  • autoDelete:设置是否自动删除。autoDelete设置为true表示自动删除,自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与它解绑就会自动删除,一般都设置为fase。autoDelete属性针对的是曾经有过但后来没有的事物。
  • internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • argument:其他一些结构化参数,比如ternate exchange。
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);

声明队列QueueDeclare常用参数说明如下:

  • queue:队列的名称。
  • durable: 设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
  • exclusive:设置是否排他。为true则设置队列为排他的。如果一个队列被声明为排
    他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意
    三点:排他队列是基于连接( Connection) 可见的,同一个连接的不同信道 (Channel)
    是可以同时访问同一连接创建的排他队列; "首次"是指如果一个连接己经声明了一个
    排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;即使该队
    列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列
    适用于一个客户端同时发送和读取消息的应用场景。
  • autoDelete: 设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:
    至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
  • argurnents: 设置队列的其他一些参数。

生产者和消费者都可以声明一个交换器或者队列。如果尝试声明一个已经存在的交换器或者队列,只要声明的参数完全匹配现存的交换器或者队列,RabbitMQ就可以什么都不做,并成功返回,如果声明的参数不匹配则会抛出异常。

生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道直为"传输"模式,之后才能声明队列。

队列绑定

public static void QueueBind(this IModel model, string queue, string exchange, string routingKey, IDictionary<string, object> arguments = null);

QueueBind方法将队列和交换器绑定参数说明:

  • queue: 队列名称。
  • exchange: 交换器的名称。
  • routingKey: 用来绑定队列和交换器的路由键。
  • argument: 定义绑定的一些参数。

生产者和消费者都应该尝试创建(声明操作)队列。但不适用于所有的情况。
如果业务本身在架构设计之初己经充分地预估了队列的使用情况,完全可以在业务程序上线之前在服务器上创建好(比如通过页面管理、 RabbitMQ命令或者更好的是从配置中心下发),这样业务程序也可以免去声明的过程,直接使用即可。预先创建好资源还有一个好处是,可以确保交换器和队列之间正确地绑定匹配。

exchangeBind方法用于交换之间的绑定。

发送消息

public static void BasicPublish(this IModel model, string exchange, string routingKey, bool mandatory = false, IBasicProperties basicProperties = null, ReadOnlyMemory<byte> body = default);

BasicPublish方法常用参数说明:

  • exchange: 交换器的名称,指明消息需要发送到哪个交换器中 如果设置为空字符串,则消息会被发送到 bbitMQ 默认的交换器中。
  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中。
  • props:消息的基本属性集。
  • byte[] body:消息体 pay1oad,真正需要发送的消息。
  • mandatory 和 immediate 消息何去何从。

消费消息

RabbitMQ的消费模式分两种:推(push)模式和拉(Pull)模式,推模式采用Basic.Consume进行消费,而拉模式则是调用Basic.Get进行消费。推模式更关注实时性,拉模式更关注消费者消费能力。

推模式:
消息中间件主动将消息推送给消费者。将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。好处很明显,消费者总是有一堆在内存中待处理的消息,所以效率高。缺点:是缓冲区可能会溢出。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using Utils;

namespace RabbitConsumerPush
{
    class Program
    {
        private static readonly String EXCHANGE_NAME = "exchange_demo2";

        private static readonly String ROUTING_KEY = "routingkey_demo2";

        private static readonly String QUEUE_NAME = "queue_demo2";
        static void Main(string[] args)
        {
            // 实例化一个连接
            using IConnection connection = MQClientConnUtils.GetConnection();
            // 实例化一个信道
            using IModel channel = connection.CreateModel();
            // 创建一个 type="direct" 、持久化的、非自动删除的交换器
            channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            // 创建一个持久化、非排他的、非自动删除的队列
            channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
            // 将交换器与队列通过路由键绑定,这个地方的ROUTING_KEY是binding key
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            //绑定消息接收事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };

            //推(push)模式消费消息
            channel.BasicConsume(queue: QUEUE_NAME,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

拉模式:
消费者主动从消息中间件拉取消息。在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。

推模式是做最常用的,但是某些情况下推模式并不适用。比如由于某些限制,消费者在某个条件成立时才能消费消息。需要批量拉取消息进行处理。可以考虑拉模式。RabbitMQ支持客户端批量拉取消息,可以连续调用basicGet方法拉取多条消息,处理完毕一次性返回ACK。使用BasicAck批量ACK传递的参数是最后一条消息的DeliveryTag。

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Utils;

namespace RabbitConsumerPull
{
    class Program
    {
        private static readonly String EXCHANGE_NAME = "exchange_demo2";

        private static readonly String ROUTING_KEY = "routingkey_demo2";

        private static readonly String QUEUE_NAME = "queue_demo2";
        static void Main(string[] args)
        {
            // 实例化一个连接
            using IConnection connection = MQClientConnUtils.GetConnection();
            // 实例化一个信道
            using IModel channel = connection.CreateModel();
            // 创建一个 type="direct" 、持久化的、非自动删除的交换器
            channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            // 创建一个持久化、非排他的、非自动删除的队列
            channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
            // 将交换器与队列通过路由键绑定,这个地方的ROUTING_KEY是binding key
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);


            //拉(pull)模式消费消息
            BasicGetResult res = channel.BasicGet(QUEUE_NAME, true);
            if (res != null)
            {
                var message = Encoding.UTF8.GetString(res.Body.ToArray());
                Console.WriteLine(" [x] Received {0}", message);
            }

            //ReceivePull(channel);


            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
        //由于某些限制,消费者在某个条件成立时才能消费消息。需要批量拉取消息进行处理。才考虑拉模式。否则没必要开线程或循环使用拉模式取代推模式。。。。
        private static void ReceivePull(IModel channel)
        {
            //存放消息实体List
            List<MessageEntity> list = new List<MessageEntity>();

            double start = new TimeSpan(DateTime.Now.Ticks).TotalSeconds;
            MessageEntity entity = new MessageEntity();

            while (true)
            {
                //拉取消息
                BasicGetResult res = channel.BasicGet(QUEUE_NAME, false);

                if (res == null)
                {
                    //间隔时间,如果超过10s还没有消费到新到消息,则将消息入库,保证实效性
                    double interval = (new TimeSpan(DateTime.Now.Ticks).TotalSeconds) - start;
                    if (list != null && interval > 10)
                    {
                        //批量确认消息
                        channel.BasicAck(entity.getTag(), true);

                        //模仿业务处理
                        //......

                        list.Clear();

                        start = new TimeSpan(DateTime.Now.Ticks).TotalSeconds;
                    }
                    continue;
                }
                String str = Encoding.UTF8.GetString(res.Body.ToArray());
                entity.setMessage(str);
                entity.setTag(res.DeliveryTag);
                list.Add(entity);
                //100条消息批量入库一次
                if (list.Count % 100 == 0)
                {
                    //批量确认消息
                    channel.BasicAck(entity.getTag(), true);
                    //模仿业务处理
                    //......

                    list.Clear();

                    start = new TimeSpan(DateTime.Now.Ticks).TotalSeconds;
                }

                Thread.Sleep(1000);
            }

        }
    }

    class MessageEntity
    {
        private String message;
        private ulong tag;

        public String getMessage()
        {
            return message;
        }

        public void setMessage(String message)
        {
            this.message = message;
        }

        public ulong getTag()
        {
            return tag;
        }

        public void setTag(ulong tag)
        {
            this.tag = tag;
        }
    }

}

消费确认与拒绝

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck等于true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

采用消息确认机制后,只要设置autoAck参数为false,消费者就有足够的时间处理消息(任务) ,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待持有消息直到消费者显式调Basic.Ack命令为止。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using Utils;

namespace RabbitConsumerAck
{
    class Program
    {
        private static readonly String EXCHANGE_NAME = "exchange_demo2";

        private static readonly String ROUTING_KEY = "routingkey_demo2";

        private static readonly String QUEUE_NAME = "queue_demo2";
        static void Main(string[] args)
        {
            // 实例化一个连接
            using IConnection connection = MQClientConnUtils.GetConnection();
            // 实例化一个信道
            using IModel channel = connection.CreateModel();
            // 创建一个 type="direct" 、持久化的、非自动删除的交换器
            channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            // 创建一个持久化、非排他的、非自动删除的队列
            channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
            // 将交换器与队列通过路由键绑定,这个地方的ROUTING_KEY是binding key
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            ulong deliveryTag = 0;
            //绑定消息接收事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                deliveryTag = ea.DeliveryTag;
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Reject {0}, deliveryTag {1}", message, deliveryTag);
            };


            //推(push)模式消费消息
            channel.BasicConsume(queue: QUEUE_NAME,
                                 autoAck: false,//为true时自动确认
                                 consumer: consumer);
            //当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

            //从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。
            //为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制,消费者在订阅队列时,可以指定 autoAck参数设置为false,并进行手动确认显式调Basic.Ack命令。
            //采用消息确认机制后,消费者就有足够的时间处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题。

            Console.WriteLine(" Press [enter] to exit and Ack");

            Console.ReadLine();

            channel.BasicAck(deliveryTag: deliveryTag, multiple: false);//手动显示确认
        }
    }
}

autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的
依据是消费该消息的消费者连接是否己经断开,这么设计的原因是RabbitMQ默认允许消费者消费一条消息的时间可以很久很久。

在RabbtiMQ Web管理平台上可以看到当前队列中的 “Ready” 状态和"Unacknowledged" 状态的消息数,分别对应等待投递给消费者的消息数和己经投递给消费者但是未收到确认信号的消息数。

消费者客户端可以调用与其对应的channel.basicReject方法来告诉RabbitMQ拒绝这个消息
channel.BasicReject(long deliveryTag, boolean requeue) 其中 deliveryTag 可以看作消息的编号,如果requeue参数设置为true ,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者包括上次拒收的消费者;如果requeue参数设置为false,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。BasicReject一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用BasicNack。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using Utils;

namespace RabbitConsumerReject
{
    class Program
    {
        private static readonly String EXCHANGE_NAME = "exchange_demo2";

        private static readonly String ROUTING_KEY = "routingkey_demo2";

        private static readonly String QUEUE_NAME = "queue_demo2";
        static void Main(string[] args)
        {
            // 实例化一个连接
            using IConnection connection = MQClientConnUtils.GetConnection();
            // 实例化一个信道
            using IModel channel = connection.CreateModel();
            // 创建一个 type="direct" 、持久化的、非自动删除的交换器
            channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            // 创建一个持久化、非排他的、非自动删除的队列
            channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
            // 将交换器与队列通过路由键绑定,这个地方的ROUTING_KEY是binding key
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            bool autoAckStu = false;
            //绑定消息接收事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                if (message.Equals("Hello World !"))
                {
                    autoAckStu = false;
                    channel.BasicReject(ea.DeliveryTag, true);//拒绝消息,requeue为false ,则 RabbitMQ立即会把消息从队列中移除。requeue为true 留给其他消费者,自己还会再次接收到。。。
                    Console.WriteLine(" [x] Reject {0}", message);
                }
                else
                {
                    autoAckStu = true;
                    Console.WriteLine(" [x] Received {0}", message);
                }

            };


            //推(push)模式消费消息
            channel.BasicConsume(queue: QUEUE_NAME,
                                 autoAck: autoAckStu,
                                 consumer: consumer);



            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

生产者发送消息
在这里插入图片描述消费者收到消息但未确认
在这里插入图片描述

关闭连接

public static void Close(IModel channel, IConnection connection) 
{
    if (channel != null) channel.Close();

    if (connection != null) connection.Close();
}

日志存储位置

Linux 下/var /log/rabbitmq/
windows下C:\Users\Administrator\AppData\Roaming\RabbitMQ\log


版权声明:本文为qq_39827640原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。