rabbitmq多线程
This article is based on some basic understanding of Rabbitmq(Queue,Consumer,Producer etc). When communicating with Rabbitmq server, a connection is required. The design of the connection in any framework is more or less the same. The Connection has to wrap a TCP/UDP Socket connection in one way or another. In amqp-client, SocketFrameHandler is used to wrap a TCP Socket.
本文基于对Rabbitmq的一些基本理解(Queue,Consumer,Producer等)。 与Rabbitmq服务器通信时,需要连接。 任何框架中的连接设计大致相同。 连接必须以一种或另一种方式包装TCP / UDP套接字连接。 在amqp-client中 , SocketFrameHandler用于包装TCP套接字。
SocketFrameHandler provides the functionality of reading from and writing to the socket.
SocketFrameHandler提供读取和写入套接字的功能。

AQMPConnection class represents the client-side connection used to communicate with the Server. _frameHandler is one attribute inside this class. This is equal to that one Connection is wrapping around one TCP socket.
AQMPConnection类表示用于与服务器通信的客户端连接。 _frameHandler是此类中的一个属性。 这等于一个连接环绕一个TCP套接字。
In Rabbitmq, Frame is a basic message unit in AMQP protocol. A Channel is used to send and receive frames. Channel is just a logical concept, each channel has one Id. Multiple channels can be created on the connection. This id of the channel is set on the Frame before sent over the connection.
在Rabbitmq中, 帧是AMQP协议中的基本消息单元。 通道用于发送和接收帧。 通道只是一个逻辑概念,每个通道都有一个ID。 可以在连接上创建多个通道。 在通过连接发送之前,在框架上设置通道的此ID。

Since Channel is used to send and receive the frames, but eventually frame has to be sent/received over the connection(TCP Socket). So each channel actually contains the connection as one attribute.
由于Channel用于发送和接收帧,因此最终必须通过连接(TCP套接字)发送/接收帧。 因此,每个通道实际上都将连接包含为一个属性。

When a channel is sending a Frame, it has to use the connection. Now imagine a situation, two channels are created for the same connection, and two different threads are used to handle each channel.
通道发送帧时,必须使用连接。 现在设想一种情况,为同一连接创建两个通道,并使用两个不同的线程来处理每个通道。

Channel A sends a frame. Channel B also sends a frame. Since both channels are using the same connection(TCP socket). Two frames can not be sent at the same time. They have to be serialized. Otherwise, the message will be corrupted. So in the writeFrame function of SocketFrameHandler, the DataOutputStream is synchronized. The same goes for DataInputStream)
通道A发送一个帧。 通道B也发送一个帧。 由于两个通道都使用相同的连接(TCP套接字)。 不能同时发送两个帧。 它们必须被序列化。 否则,该消息将被破坏。 所以在SocketFrameHandler的writeFrame功能,DataOutputStream类是同步的。 DataInputStream也是如此)
SocketFrameHandler.javapublic void writeFrame(Frame frame) throws IOException {
synchronized(this._outputStream) {
frame.writeTo(this._outputStream);
}
}In order to communicate with the server, A connection has to be created. ConnectionFactory can be used to create a connection.
为了与服务器通信,必须创建一个连接。 ConnectionFactory可用于创建连接。
FrameHandlerFactory fhFactory = ConnectionFactory.createFrameHandlerFactory();//create FrameHandler
FrameHandler handler = fhFactory.create(addr, clientProvidedName);//create Connection
AMQConnection conn = this.createConnection(params, handler, this.metricsCollector)We know that TCP socket actually exists inside the FrameHandler. FrameHandler is one attribute in the AMQConnection. So in order to construct the AMQConnection, FrameHandler should be created first.
我们知道TCP套接字实际上存在于FrameHandler中 。 FrameHandler是在AMQConnection一个属性。 因此,为了构造AMQConnection ,应首先创建FrameHandler 。
After an AMQConnection is created, we can call start() function on the connection. start() function will start a separate thread running on this connection to wait for incoming frames on the socket. This thread is normally called IO thread.
创建AMQConnection之后,我们可以在连接上调用start()函数。 start()函数将启动在此连接上运行的单独线程,以等待套接字上的传入帧。 该线程通常称为IO线程。

As said earlier, Channel is used to handle send and receive frames. In the frame header, there is a channel#id field. So after SocketFrameHandler reads the frame from the socket, it will deliver the frame to the correct channel on this connection. Channel has a methodhandleFrame(frame).
如前所述,Channel用于处理发送和接收帧。 在帧头中,有一个channel#id字段。 因此,在SocketFrameHandler从套接字读取帧之后,它将把该帧传递到此连接上的正确通道。 Channel具有方法handleFrame(frame)。
Now imagine that we start 3 connections. Then there will be three forked threads which are reading frames from each socket.
现在想象一下,我们开始了3个连接。 然后将有三个分叉的线程正在从每个套接字读取帧。
In AMQP protocol, a complete command is composed of multiple frames. Each frame contains only a part of one AMQCommand. Channel will use AMQCommandclass to process the received frames, compose a Command, and then dispatch to a thread pool to consume the message. CommandAssembleristhe class that isassembling frames into a Command.
在AMQP协议中,一个完整的命令由多个帧组成。 每个帧仅包含一个AMQCommand的一部分。 Channel将使用AMQCommand类处理接收到的帧, 编写一个Command,然后分派到线程池以使用该消息。 CommandAssembler为 那个阶级 将框架组装成Command。

In Rabbitmq, one connection could contain multiple channels. There is no direct relationship between channel and queue. Multiple consumers could be configured for the same queue.
在Rabbitmq中,一个连接可以包含多个通道。 通道和队列之间没有直接关系。 可以为同一队列配置多个使用者。

In the above setup, connection#1 has two channels. Both channels are used for reading and writing messages. In Rabbitmq, Channel is not thread-safe. So it’s better to use different threads to handle different channels.
在上述设置中, 连接#1具有两个通道。 两个通道均用于读取和写入消息。 在Rabbitmq中,Channel不是线程安全的。 因此,最好使用不同的线程来处理不同的通道。
Consider the following case:
考虑以下情况:
If we calledchannel#0.basicConsume(“queueB”)fromconnection#2. Does Rabbitmq deliver the message to consumer#3 of queueB also using connection#2? Is it possible that the Rabbitmq uses a different connection such as connection#1 to deliver the message to consumer#3?
如果我们从连接#2调用channel#0.basicConsume(“ queueB”) 。 不提供的RabbitMQ的等级列队B也使用连接#2的消息, 消费者#3? Rabbitmq是否可能使用其他连接(例如connection#1)将消息传递给消费者#3 ?
Answer: Rabbitmq will use the connection#2 to deliver the messages. It will not use connection#1. Because channels are bounded to connections. When the connection receives the frame, AMQConnection.readFramewill find the channel which has the same channel-id as the one contained in the frame header. Channels for different connections might have different channel ids. If the frame is received in a different connection, then AMQConnection.readFrameof connection#1may not be able to find the channel with the same channel-id.
答:Rabbitmq将使用连接#2传递消息。 它不会使用connection#1 。 因为通道受限于连接。 当连接接收到该帧时, AMQConnection.readFrame将找到具有与帧头中包含的通道ID相同的通道ID的通道 。 不同连接的通道可能具有不同的通道ID。 如果在不同的连接中接收到该帧,则连接#1的 AMQConnection.readFrame可能无法找到具有相同通道ID的通道。

The above is the workflow after creating and starting a connection from the amqp-client.jar. Every connection maintains ConsumerWorkService thread pool. This thread pool is used to handle the tasks inserted by IO thread on the connection. Each task is using the Consumer handleDeliveryto handle the message. Users normally has to implement the handleDelivery themselves.
以上是从amqp-client.jar创建并启动连接之后的工作流程。 每个连接都维护ConsumerWorkService线程池。 该线程池用于处理IO线程在连接上插入的任务。 每个任务都使用Consumer handleDelivery处理消息。 用户通常必须自己实现handleDelivery 。
Lastly, Rabbitmq recommends
最后,Rabbitmq建议
- Use one connection per process. Or one connection for publishing, one connection for consuming每个进程使用一个连接。 或一个连接用于发布,一个连接用于消费
- Use one channel per thread每个线程使用一个通道
- Don’t use too many queues. Rabbitmq uses a different thread to handle each queue. Otherwise, it will introduce too many context switching.不要使用太多队列。 Rabbitmq使用不同的线程来处理每个队列。 否则,它将引入过多的上下文切换。
If anything wrong, please leave a comment.
如果有任何问题,请发表评论。
翻译自: https://medium.com/swlh/rabbitmq-thread-model-62d5505d68ce
rabbitmq多线程