rabbitmq取消自动重连_RabbitMQ---8、连接断开处理-断线重连

本文转载于:https://www.itsvse.com/thread-4636-1-1.html;

参考文献:http://www.likecs.com/show-29874.html;https://stackoverflow.com/questions/41279186/guaranteed-publishing-of-messages-on-rabbitmq-on-network-loss;

Rabbitmq 官方给的NET consumer示例代码如下,但使用过程,会遇到connection断开的问题,一旦断开,这个代码就会报错,就会导致消费者或者生产者挂掉。

下图是生产者发送消息,我手动停止了rabbitmq,然后又重新启动了rabbitmq,大概等启动成功以后,为了防止服务没有完全启动,我又等待了10秒钟

服务完全启动成功以后,我尝试重新发送一些消息,报错,如下:

************** 异常文本 **************

RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=320, text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'", classId=0, methodId=0, cause=

在 RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)

在 RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)

在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, Byte[] body)

在 RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, IBasicProperties basicProperties, Byte[] body)

在 rabbitMQ_Publish.Form1.button1_Click(Object sender, EventArgs e) 位置 C:\project\my\RabbitMQ-demo\rabbitMQ-Publish\Form1.cs:行号 37

在 System.Windows.Forms.Control.OnClick(EventArgs e)

在 System.Windows.Forms.Button.OnClick(EventArgs e)

在 System.Windows.Forms.Button.PerformClick()

在 System.Windows.Forms.Form.ProcessDialogKey(Keys keyData)

在 System.Windows.Forms.TextBoxBase.ProcessDialogKey(Keys keyData)

在 System.Windows.Forms.Control.PreProcessMessage(Message& msg)

在 System.Windows.Forms.Control.PreProcessControlMessageInternal(Control target, Message& msg)

在 System.Windows.Forms.Application.ThreadContext.PreTranslateMessage(MSG& msg)

 

那么如何会异常恢复呢?或者说断线重连呢?

RabbitMQ NET Client的源码,研究发现一种自动的错误恢复机制 AutomaticRecoveryEnabled = true 使用方式如下

var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true };

复制代码

具体的恢复机制如下

1.在AutoRecoveringConnection初始化时,在链接关闭事件委托上增加断开处理

public void init()

{

m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());

AutorecoveringConnection self = this;

EventHandler recoveryListener = (_, args) =>

{

lock (recoveryLockTarget)

{

if (ShouldTriggerConnectionRecovery(args))

{

try

{

self.BeginAutomaticRecovery();

}

catch (Exception e)

{

// TODO: logging

Console.WriteLine("BeginAutomaticRecovery() failed: {0}", e);

}

}

}

};

lock (m_eventLock)

{

ConnectionShutdown += recoveryListener;

if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))

{

m_recordedShutdownEventHandlers.Add(recoveryListener);

}

}

}

复制代码

观察调用的方式BeginAutomaticRecovery,可以看到这个方法内部调用了PerformAutomaticRecovery方法。我们直接看这个方法的内容,其中第一个调用的是方法RecoverConnectionDelegate

protected void PerformAutomaticRecovery()

{

lock (recoveryLockTarget)

{

RecoverConnectionDelegate();

RecoverConnectionShutdownHandlers();

RecoverConnectionBlockedHandlers();

RecoverConnectionUnblockedHandlers();

RecoverModels();

if (m_factory.TopologyRecoveryEnabled)

{

RecoverEntities();

RecoverConsumers();

}

RunRecoveryEventHandlers();

}

}

复制代码

这个方法中调用的是

protected void RecoverConnectionDelegate()

{

bool recovering = true;

while (recovering)

{

try

{

m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());

recovering = false;

}

catch (Exception)

{

// TODO: exponential back-off

Thread.Sleep(m_factory.NetworkRecoveryInterval);

// TODO: provide a way to handle these exceptions

}

}

}

复制代码

可以看出,它是执行了死循环,直到连接重新打开,当然,如果遇到异常,它会调用Thread.Sleep来等待一下,然后再次执行连接恢复。


版权声明:本文为weixin_31300697原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。