RabbitMQ Producer C# .NET Core 5.0 内存泄漏

RabbitMQ Producer C# .NET Core 5.0 Memory Leak

我想知道是否有人可以帮助解决以下情况:

我无法使用用 C# 编写并使用 .Net core 5.0 的 RabbitMQ Publisher 解决内存泄漏

这是 csproj 文件:

<Project Sdk="Microsoft.NET.Sdk">   
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net5.0</TargetFramework>
    <RuntimeIdentifier>win-x64</RuntimeIdentifier>
  </PropertyGroup>
  ...
</Project>  

我在虚拟机中有一个 .net 控制台应用程序 运行,它通过 API(注册的 64 位 dll 并作为 COM 引用引用)连接到服务器,从中获取信息该服务器然后尝试将此信息发布到位于 AWS 云上的 RabbitMQ 机器(具有此 RMQ 实例的多个节点的负载均衡器)。

访问代码中的API是通过以下方式完成的:

        private void SetUpApi () {
            Utils.log.Info (_api.SetAPIOptions ("<CONNECTIONOPTIONS><CALCULATED_PRICES Enabled='true' MaximumDepth='4'/></CONNECTIONOPTIONS>"));
            _api.OnServerConnect += OnServerConnect;
            _api.OnServerDisconnect += OnServerDisconnect;
            _api.OnNewData += OnNewData;            
        }
        private void OnNewData(string strXML){
            try{
                if (strXML.Contains("<ORDER")){
                    ParseXMLAnswer(strXML, "OnNewData ()");
                }
            }
            catch (Exception ex) {
                if (ex.InnerException is AlreadyClosedException || ex.InnerException is BrokerUnreachableException)
                    Utils.log.Error("OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException ");
                else
                    Utils.printException("OnNewData ()", ex);
            }
        }
        
        private void ParseXMLAnswer(string strOutputXML, string caller) {
            XmlDocument doc = new XmlDocument();
            doc.LoadXml(strOutputXML);
            string jsonText = JsonConvert.SerializeXmlNode(doc);
            var o = JObject.Parse(jsonText);

            if (o["APIDATA"]["ORDER"].Type is JTokenType.Object){
                JObject order = (JObject)o["APIDATA"]["ORDER"];

                SendOrderToRMQ(order);
            }
            else if (o["APIDATA"]["ORDER"].Type is JTokenType.Array){
                JArray orders = (JArray)o["APIDATA"]["ORDER"];
                foreach (var item in orders.Children()){
                    SendOrderToRMQ((JObject)item);
                }
            }
            doc = null;
            GC.Collect();
            GC.WaitForPendingFinalizers();
        }

        private void SendOrderToRMQ (JObject order){
            JObject instrSpeciefier = (JObject) order["INSTSPECIFIER"];

            var firstSeqID = instrSpeciefier.GetValue("@FirstSequenceID").ToString();
            var firstSeqItemID = instrSpeciefier.GetValue("@FirstSequenceItemID").ToString();
                       
            if (sequenceItemsHashed.ContainsKey(firstSeqID) &&
                sequenceItemsHashed[firstSeqID].Contains(firstSeqItemID)){
                string itemName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@FirstSequenceItemName").ToString());
                string instrumentName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@InstName").ToString());

                int index = sequenceItemsHashed[firstSeqID].IndexOf(firstSeqItemID) + 1;
                var binding = instrumentName + "." + sequencesFromInstruments[firstSeqID] + "." + itemName + "." + index;

                serviceInstance1.Publish(
                   order.ToString(),
                   _exchangeName,
                   "",
                   binding);
            }
            order = null;
            instrSpeciefier = null;           
        }

高峰期 营业时间内,我从API 得到大约400 - 500 messages/second。这些消息以 XML 消息的形式出现。例如,一条消息可以包含多个订单,如下例所示。一个元素可以包含插入(创建)订单的操作,另一个元素可以包含删除特定订单的操作。

<?xml version="1.0" encoding="utf-16"?>
  <APIDATA xmlns="api-com">
  <ORDER EngineID="0" PersistentOrderID="2791" ...>
    <INSTSPECIFIER InstID="287" ... />
    ...
  </ORDER>
  <ORDER EngineID="0" PersistentOrderID="9840" ...>
    <INSTSPECIFIER InstID="288" ... />
    ...
  </ORDER>

RabbitMQ 服务器已配置,我使用 SSL(带有证书和密钥)连接到它。 我使用RabbitMQ.Client v6.2.1 连接到RMQ。 交换、队列和绑定已经在 RabbitMQ 中定义。我的 Producer 应用程序只连接到它并开始发布。

我使用 同步 发布方法很重要,因为我们获取消息的顺序非常重要。例如,在一条消息中,我们得到一个创建订单的动作,而另一条消息紧随其后,告诉我们删除同一个订单。如果我使用 async 方法发布到 RMQ,我可能会在插入操作之前获得删除操作。

  <?xml version="1.0" encoding="utf-16"?>
  <APIDATA xmlns="api-com">
  <ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Insert" ...>
      ...
  </ORDER>

删除消息:

  <?xml version="1.0" encoding="utf-16"?>
  <APIDATA xmlns="api-com">
  <ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Remove" ...>
      ...
  </ORDER>

我使用以下方法发布到 RMQ:对象池(Microsoft 提供了一个名为 Microsoft.Extensions.ObjectPool 的包)- 此处描述的方法 - https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/ .

我在这里使用以下代码:

class RabbitManager : IRabbitManager
{
    private readonly DefaultObjectPool<IModel> _objectPool;
    public RabbitManager(IPooledObjectPolicy<IModel> objectPolicy){
        _objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
    }

    public void Publish<T>(T message, string exchangeName, string exchangeType, string routeKey) where T : class {
        if (message == null)
            return;

        var channel = _objectPool.Get();
        try{
            var sendBytes = Encoding.UTF8.GetBytes(message.ToString());
            var properties = channel.CreateBasicProperties();
            properties.ContentType = "application/json";
            properties.DeliveryMode = 1; // Doesn't persist to disk
            properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());

            channel.BasicPublish(exchangeName, routeKey, properties, sendBytes);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            _objectPool.Return(channel);
        }
    }
}
public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>
{
    private readonly RabbitOptions _options;
    private readonly IConnection _connection;

    public RabbitModelPooledObjectPolicy(RabbitOptions _options){
        this._options = _options;
        _connection = GetConnection();
    }

    private IConnection GetConnection() {
        var factory = new ConnectionFactory() {
            HostName = _options.HostName,
            UserName = _options.UserName,
            Password = _options.Password,
            //Port = _options.Port,
            VirtualHost = _options.VHost,
        };

        if (!String.IsNullOrEmpty(_options.CertPath))
        {
            factory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
            factory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
            factory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
            factory.Ssl.ServerName = _options.HostName;
            factory.Ssl.CertPath = _options.CertPath;
            factory.Ssl.CertPassphrase = _options.CertPass;
            factory.Ssl.Version = SslProtocols.Tls12;
            factory.Ssl.Enabled = true;
        }

        factory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
        factory.AutomaticRecoveryEnabled = true;        // enable automatic connection recovery
        factory.RequestedChannelMax = 32;

        var _connection = factory.CreateConnection();
        _connection.ConnectionShutdown += Connection_ConnectionShutdown;

        return _connection;
    }

    private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
        Utils.log.Info("Connection broke!");
    }

    private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors){
        return true;
    }

    public IModel Create(){
        return _connection.CreateModel();
    }

    public bool Return(IModel obj) {
        if (obj.IsOpen) {
            return true;
        }
        else {
            obj?.Dispose();
            return false;
        }
    }
}

下面是有问题的屏幕截图 - 内存不断增加:

这是在上面的屏幕截图之后拍摄的内存快照的堆栈跟踪:

截取上面的屏幕截图后,我的程序在控制台中收到以下错误消息:

26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
   at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
   at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
   at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
   at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
   at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
   at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348
26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
   at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
   at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
   at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
   at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
   at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
   at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348

这导致程序最终崩溃:

我做了什么来防止内存泄漏 :

  1. classes 启用了 IDisposable 接口
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                // free managed resources
                _onTimePerHour.Dispose();
                _api.OnNewData -= OnNewData;
            }
            // free native resources if there are any.
        }
  1. 收到每条消息后强制进行垃圾回收:
private void ParseXMLAnswer(string strOutputXML, string caller) {
            ...
            doc = null;
            GC.Collect();
            GC.WaitForPendingFinalizers();
}

这有点帮助,现在我的内存问题在较长时间后增加了。

  1. 我使用 Visual Studio 的 ReShaper (enter link description here) 插件来让我更好地理解内存问题的堆栈跟踪,但它并没有太大帮助。

我觉得是什么问题 :

RabbitMQ Producer 应用程序每秒获取许多消息,然后对这些消息进行解析,拆分为多个 JSON 消息并使用相同的通道发送到 RMQ。可能会出现以下情况:

以前有人遇到过这个问题吗?因为我在任何地方都找不到有关此“SingleProducerSingleConsumerQueue+Segment 内存不足”问题的任何信息。

有人知道如何更深入地分析这个问题吗?

非常感谢!


编辑 1

我想解决这个内存问题需要更多信息。

我有几个消费者使用来自 RabbitMQ 的数据(比如 NodeJS 和 python 应用程序)。因此,我需要以通用方式设计 RabbitMQ 生产者,因为每个消费者需要不同的数据。每次我有一个新的消费者应用程序时,我都无法修改和重新启动我的 RabbitMQ 生产者。所以我需要以通用的方式发布我的消息。

例如,每个消费者都有自己的专用队列,具有专用绑定。假设我有带有队列 cons1 和绑定的 consumer1 :

此绑定是动态的,目前它对应于星期一 (04.April),但明天它将对应于星期二 (05.April)。

所以我需要使用

将市场名称和产品名称存储在内存中
private static read-only Dictionary<string, List<string>> sequenceItemsHashed = new Dictionary<string, List<string>>();
private static readonly Dictionary<string, string> sequencesFromInstruments = new Dictionary<string, string>();

我提到在我的逻辑中 sequenceItemsHashed 对应于 marketNames 和 sequenceFromInstruments 对应于 productNames。

通过这种方式,我将所有消息发送到 RMQ,然后使用绑定在 RMQ 中进行排序。


编辑 2

据我了解,为了解决我的问题,我需要类似以下架构的东西 (enter link description here) :

所以在我到 RMQ 服务器的单个连接中有多个线程,每个线程一个通道。


编辑 3

已成功实施管道、ConcurrentQueue 和推送到 RMQ 的消费者线程,但仍然存在内存问题:

private readonly TransformBlock<string, string> orderFilter;
private readonly TransformBlock<string, JObject> xmlParser;
//private readonly TransformBlock<XmlDocument, JObject> xmlToJsonTransformer;
private readonly TransformManyBlock<JObject, JToken> jsonOrderFactory;
private readonly ActionBlock<JToken> messageSender;

ConcurrentQueue<JToken> concurrentQueue = new ConcurrentQueue<JToken>();

public Service1 (string [] args) {
    ...
    // setup pipeline blocks
    orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
    xmlParser = new TransformBlock<string, JObject>(ParseXml);
    jsonOrderFactory = new TransformManyBlock<JObject, JToken>(CreateOrderMessages);
    messageSender = new ActionBlock<JToken>(SendMessage);

    // build your pipeline            
    orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
    orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs

    xmlParser.LinkTo(jsonOrderFactory);
    jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });

    Task t2 = Task.Factory.StartNew(() =>
            {
                while (true) { 
                    if (!concurrentQueue.IsEmpty)
                    {
                        JToken number;
                        while (concurrentQueue.TryDequeue(out number))
                        {
                            _rabbitMQ.PublishMessages(
                                Encoding.ASCII.GetBytes(number.ToString()),
                                "test"
                            );
                        }
                    } else
                    {
                        Thread.Sleep(1);
                    }
                }
            });        
     ...
}

private string FilterIncomingMessages(string strXml){
    if (strXml.Contains("<ORDER")) return strXml;
    return null;
}

private JObject ParseXml(string strXml){
    XmlDocument doc = new XmlDocument();
    doc.LoadXml(strXml);
    string jsonText = JsonConvert.SerializeXmlNode(doc);
    var o = JObject.Parse(jsonText);
    return o;
}

private IEnumerable<JToken> CreateOrderMessages(JObject o){
    List<JToken> myList = new List<JToken>();
            if (o.ContainsKey("GV8APIDATA")){
                if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Object){
                    JToken order = o["GV8APIDATA"]["ORDER"];
                    myList.Add(order);
                }
                else if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Array){
                    JToken orders = o["GV8APIDATA"]["ORDER"];
                    foreach (var order in orders.Children()){
                        myList.Add(order);
                    }
                }
            }
            return myList.ToArray ();
        }

private void SendMessage(JToken order){
    concurrentQueue.Enqueue(order);
}

新的解决方案有助于将逻辑分成几个小部分,但我的内存仍然在不断增加。


编辑 4

考虑到@Fildor 的回答,我做了以下事情:

我没有将包含 xml 和 元素的字符串转换为 JSON,而是使用管道和下面的代码将 XML 反序列化为对象。

我删除了 Thread 和 ConcurrentQueue 的部分,我直接在最后一个 ActionBlock 中发布。

这解决了我的内存泄漏问题,但还有其他问题,例如:

public Service1 (string [] args) {
            ...             
            // setup pipeline blocks
            orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
            xmlParser = new TransformBlock<string, OrdersResponse>(ParseXml);
            jsonOrderFactory = new TransformManyBlock<OrdersResponse, Order>(CreateOrderMessages);
            messageSender = new ActionBlock<Order>(SendMessage);

            // build your pipeline            
            orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
            orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs
            xmlParser.LinkTo(jsonOrderFactory);
            jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });

            RunAsConsole(args);
        }

        private readonly TransformBlock<string, string> orderFilter;
        private readonly TransformBlock<string, OrdersResponse> xmlParser;
        private readonly TransformManyBlock<OrdersResponse, Order> jsonOrderFactory;
        private readonly ActionBlock<Order> messageSender;

        private void OnNewData(string strXML){
            orderFilter.Post(strXML); 
        }        

        private string FilterIncomingMessages(string strXml){
            if (strXml.Contains("<ORDER")) return strXml;
            return null;
        }

        private OrdersResponse ParseXml(string strXml) {
            var rootDataObj = DeserializeOrdersFromXML(strXml);
            return rootDataObj;
        }

        private OrdersResponse DeserializeOrdersFromXML(string strOutputXML){
            var xsExpirations = new XmlSerializer(typeof(OrdersResponse));
            OrdersResponse rootDataObj = null;
            using (TextReader reader = new StringReader(strOutputXML)) {
                rootDataObj = (OrdersResponse)xsExpirations.Deserialize(reader);
                reader.Close();
            }
            return rootDataObj;
        }

        private IEnumerable<Order> CreateOrderMessages(OrdersResponse o){
            return o.orders;
        }

        private void SendMessage(Order order) {
            _rabbitMQ.PublishMessages(
                    Encoding.ASCII.GetBytes(order.ToString()),
                    "test"
                );
        }

ORDER 对象看起来像:

    [Serializable()]
    [XmlRoot (ElementName = "ORDER")]
    public class Order : IDisposable {

        public void Dispose()
        {
            EngineID = null;
            PersistentOrderID = null;
            ...
            InstrumentSpecifier.Dispose();
            InstrumentSpecifier = null;
            GC.SuppressFinalize(this);
        }

        [XmlAttribute (AttributeName = "EngineID")]
        public string EngineID { get; set; }
        [XmlAttribute (AttributeName = "PersistentOrderID")]
        public string PersistentOrderID { get; set; }
        ... 
        [XmlElement(ElementName = "INSTSPECIFIER")]
        public InstrumentSpecifier InstrumentSpecifier { get; set; }
    }

还有我的新 RabbitMQ class :

public class RMQ : IDisposable {
    private IConnection _connection;
    public IModel Channel { get; private set; }        
    private readonly ConnectionFactory _connectionFactory;
    private readonly string _exchangeName;

    public RMQ (RabbitOptions _rabbitOptions){
        try{
            // _connectionFactory initialization
            _connectionFactory = new ConnectionFactory()
            {
                HostName = _rabbitOptions.HostName,
                UserName = _rabbitOptions.UserName,
                Password = _rabbitOptions.Password,
                VirtualHost = _rabbitOptions.VHost,
            };
            this._exchangeName = _rabbitOptions.ExchangeName;

            if (!String.IsNullOrEmpty(_rabbitOptions.CertPath)){
                _connectionFactory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
                _connectionFactory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
                _connectionFactory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
                _connectionFactory.Ssl.ServerName = _rabbitOptions.HostName;
                _connectionFactory.Ssl.CertPath = _rabbitOptions.CertPath;
                _connectionFactory.Ssl.CertPassphrase = _rabbitOptions.CertPass;
                _connectionFactory.Ssl.Version = SslProtocols.Tls12;
                _connectionFactory.Ssl.Enabled = true;
            }

            _connectionFactory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
            _connectionFactory.AutomaticRecoveryEnabled = true;        // enable automatic connection recovery
            //_connectionFactory.RequestedChannelMax = 10;

            if (_connection == null || _connection.IsOpen == false){
                _connection = _connectionFactory.CreateConnection();
                _connection.ConnectionShutdown += Connection_ConnectionShutdown;
            }
            if (Channel == null || Channel.IsOpen == false){
                Channel = _connection.CreateModel();
            }
            Utils.log.Info("ConnectToRabbitMQ () Connecting to RabbitMQ. rabbitMQenvironment = ");
        }
        catch (Exception ex){
            Utils.log.Error("Connection to RabbitMQ failed ! HostName = " + _rabbitOptions.HostName + " VirtualHost = " + _rabbitOptions.VHost);
            Utils.printException("ConnectToRMQ ()", ex);
        }
    }        

    private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
        Utils.log.Info ("Connection broke!");
        try{
            if (ReconnectToRMQ()){
                Utils.log.Info("Connected!");
            }
        }
        catch (Exception ex){
            Utils.log.Info("Connect failed!" + ex.Message);
        }
    }

    private bool ReconnectToRMQ(){
        if (_connection == null || _connection.IsOpen == false){
            _connection = _connectionFactory.CreateConnection();
            _connection.ConnectionShutdown += Connection_ConnectionShutdown;                
        }

        if (Channel == null || Channel.IsOpen == false){
            Channel = _connection.CreateModel();
            return true;
        }
        return false;
    }

    private bool ValidateServerCertificate (object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
        return true;
    }

    public void DisconnectFromRMQ () {
        Channel.Close ();
        _connection.Close ();
    }     

    public void Dispose(){
        try{
            Channel?.Close();
            Channel?.Dispose();
            Channel = null;

            _connection?.Close();
            _connection?.Dispose();
            _connection = null;
        }
        catch (Exception e){
            Utils.log.Error("Cannot dispose RabbitMQ channel or connection" + e.Message);
        }
    }

    public void PublishMessages (byte [] message, string routingKey) {            
        if (this._connection == null || ! _connection.IsOpen) {
            Utils.log.Error ("PublishMessages(), Connect failed! this.conn == null || !conn.IsOpen ");
            ReconnectToRMQ();
        } else { 
            var properties = Channel.CreateBasicProperties();
            properties.Persistent = true;

            Channel.BasicPublish (_exchangeName, routingKey, properties, message);
            //serviceInstance1.Publish(message, _rabbitOptions.ExchangeName, "", routingKey);
        }
    }
}

现在有趣的是,如果我只将像“test”这样的小字符串发布到 RabbitMQ 到我的预定义队列中,我可以发布超过 1780 个 messages/second。

首先,您似乎阻塞了事件处理线程。 所以,我要做的是将事件处理与实际处理分离:

(未经测试!只是一个大纲!)

已删除错误代码

然后在 serviceInstance1 中,我会让 Publish 将订单排入 BlockingCollection 队列,专用线程正在其上等待。该线程将执行实际发送。因此,无论您在 Processor 中选择做什么,您都会将订单编组到该线程,并且所有订单都将解耦并按顺序排列。

您可能希望根据您的要求设置 BlockOptions。

请注意,这只是一个粗略的轮廓,不是完整的解决方案。您可能还想从那里开始并尽量减少字符串操作等。

编辑

从昨天开始我想到的一些想法没有特别的顺序:

  • 放弃第一个过滤器以稍后过滤掉空的 JObject 集是否有益?
  • 也许值得尝试使用 System.Text.Json instead of Newtonsoft?
  • 有没有更有效的方法从 xml 到 json? (我在想“XSLT”,但真的不确定
  • 我建议使用 MemoryAnalyzer 安装 Benchmark.Net 来记录/证明您的更改具有积极影响。
  • 不要忘记查看 DataFlowBockOptions 以调整管道的行为。

供参考:

回应问题中的编辑 3:

Task t2 = Task.Factory.StartNew(() =>
            {
                while (true) { 
                    if (!concurrentQueue.IsEmpty)
                    {
                        JToken number;
                        while (concurrentQueue.TryDequeue(out number))
                        {
                            _rabbitMQ.PublishMessages(
                                Encoding.ASCII.GetBytes(number.ToString()),
                                "test"
                            );
                        }
                    } else
                    {
                        Thread.Sleep(1);
                    }
                }
            });  

确实不是个好主意。

首先,我会在处理发送的服务 class 中执行此操作(serviceInstance1 - 不知道类型)。然后,在将 TPL 与 Thread.Sleep 混合时,您正在使用自旋等待进行紧密循环。那是 2 个 NoNos。这也完全打乱了阻塞队列的意图。即:线程阻塞,直到该队列上有一个项目可用。

也许现在完全放弃这部分是个更好的主意,让管道中的最后一个块执行 serviceInstance1.Publish。这可能是过早的优化。

编辑 2

所以,昨天我做了一些实验,发现:

using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml.Linq;
using System.Xml.Serialization;
using Newtonsoft.Json;
using System.Linq;

namespace DataFlowExperiment.PipelinesLib
{

    public class PipelineOne
    {
        private readonly IPipelineOneSteps steps;

        private readonly TransformBlock<string, XDocument> startBlock; // XML deserialize to Model
        private readonly TransformManyBlock<XDocument, string> toJsonMessagesBlock; // jsons generieren.
        private readonly ITargetBlock<string> resultCallback;

        public PipelineOne(IPipelineOneSteps steps, ITargetBlock<string> resultCallback = null)
        {
            this.steps = steps;

            startBlock = new TransformBlock<string, XDocument>(steps.Start);
            toJsonMessagesBlock = new TransformManyBlock<XDocument, string>(steps.ToJson);

            this.resultCallback = resultCallback ?? DataflowBlock.NullTarget<string>();

            startBlock.LinkTo(toJsonMessagesBlock, new DataflowLinkOptions { PropagateCompletion = true });
            toJsonMessagesBlock.LinkTo(this.resultCallback, new DataflowLinkOptions { PropagateCompletion = true }, x => !string.IsNullOrEmpty(x));
            toJsonMessagesBlock.LinkTo(DataflowBlock.NullTarget<string>(), new DataflowLinkOptions { PropagateCompletion = true });
        }

        public void Post(string input)
        {
            startBlock.Post(input);
        }

        public Task Close()
        {
            startBlock.Complete();
            return resultCallback.Completion;
        }
    }

    public interface IPipelineOneSteps
    {
        public XDocument Start(string input);
        public IEnumerable<string> ToJson(XDocument doc);
    }

    public class PipelineOneSteps : IPipelineOneSteps
    {
        private readonly JsonSerializer jsonSerializer;

        public PipelineOneSteps()
        {
            jsonSerializer = JsonSerializer.CreateDefault();
        }

        public XDocument Start(string input)
        {
            XDocument doc = XDocument.Parse(input);
            return doc;
        }

        public IEnumerable<string> ToJson(XDocument doc)
        {
            XNamespace ns = "api-com";
            var orders = doc.Root.Elements(ns + "ORDER");

            foreach (var order in orders)
            {
                yield return JsonConvert.SerializeXNode(order);
            }
        }
    }
}

此基准测试的结果:


BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19041.867 (2004/?/20H1)
Intel Core i9-10885H CPU 2.40GHz, 1 CPU, 16 logical and 8 physical cores
.NET Core SDK=5.0.202
  [Host]     : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT
  DefaultJob : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT


Method N Mean Error StdDev Gen 0 Gen 1 Gen 2 Allocated
PipeLineOneBenchmark 1000 25.00 μs 0.269 μs 0.252 μs - - - -
PipeLineOneBenchmark 100000 2,491.42 μs 13.655 μs 15.177 μs - - - -

这与您的解决方案相似但不同。

不过,这让我觉得,实际问题出在别处。 (仍在努力并准备更新。)

我在想一个小工具,看看你的兔子是否太慢了,你有大量积聚:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace DataFlowExperiment.PipelinesLib
{
    public class RabbitWrapper : IDisposable
    {
        private readonly int batchSize = 10;

        private Thread senderThread;
        private readonly BlockingCollection<string> messages;
        private readonly ActionBlock<string> receiver;
        private readonly CancellationTokenSource stoppingToken;
        private readonly RabbitWrapperStats stats;

        private ITargetBlock<string> Receiver => receiver;

        public RabbitWrapper()
        {
                                                                // Drop in your logging here
            stats = new RabbitWrapperStats(new Progress<string>(x => Console.WriteLine(x)));
            stoppingToken = new CancellationTokenSource();
            messages = new BlockingCollection<string>();
            receiver = new ActionBlock<string>(Receive);
            senderThread = new Thread(HandleQueue);
            senderThread.Start();
        }

        private void Receive(string message)
        {
            messages.Add(message);
        }

        private void HandleQueue()
        {
            while (!stoppingToken.Token.IsCancellationRequested)
            {
                int batchIndex = 0;
                do {
                    string message = messages.Take(stoppingToken.Token);
                    if (!string.IsNullOrEmpty(message))
                    {
                        SendToRabbit(message);
                    }
                    batchIndex++;
                } while (!stoppingToken.Token.IsCancellationRequested &&
                         batchIndex < batchSize &&
                         messages.Count > 0);
                // Check statistics every 10 messages.
                CheckStats(messages.Count);
            }
        }

        private void SendToRabbit(string message)
        {
            // rabbit Publish goes here.
        }

        private void CheckStats(int count)
        {
            stats.CheckStats(count);
        }

        public void Close()
        {
            this.stoppingToken.Cancel();
            senderThread.Join();
        }

        public void Dispose()
        {
            Close();
        }
    }

    internal class RabbitWrapperStats
    {
        // You may want to play around with these thresholds
        // I pulled them out of thin air ...
        const int SIZE_WARN = 500000;
        const int SIZE_CRITICAL = SIZE_WARN * 2;

        private int lastTenIndex = 0;
        private int[] lastTen = new int[10] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
        private int meanSizeLastTen = 0;
        private int lastMeanSize = 0;
        private int tendency = 0;

        private bool HasWarned = false;
        private bool HasPanicked = false;

        private readonly IProgress<string> progress;

        public RabbitWrapperStats(IProgress<string> progress)
        {
            this.progress = progress;
        }

        public void CheckStats(int queueSize)
        {
            UpdateLastTen(queueSize);

            if (!HasPanicked && queueSize > SIZE_CRITICAL)
            {
                Panic(queueSize);
                return;
            }

            if (!HasWarned && queueSize > SIZE_WARN)
            {
                Warn(queueSize);
                return;
            }

            if ((HasPanicked || HasWarned ) && meanSizeLastTen < SIZE_WARN * 0.75)
            {
                HasPanicked = false;
                HasWarned = false;
                progress?.Report($"INFO Mean size of last 10 Samples sinks below {SIZE_WARN * 0.75} : {meanSizeLastTen}");
            }
        }

        private void Warn(int size)
        {
            HasWarned = true;
            progress?.Report($"WARNING QueueSize = {size}");
        }

        private void Panic(int size)
        {
            HasPanicked = true;
            progress?.Report($"!! CRITICAL !! QueueSize = {size}");
        }

        private void UpdateLastTen(int value)
        {
            lastTen[lastTenIndex] = value;
            lastTenIndex = ++lastTenIndex % lastTen.Length;
            meanSizeLastTen = lastTen.Sum() / lastTen.Length;
            tendency = meanSizeLastTen.CompareTo(lastMeanSize);
            lastMeanSize = meanSizeLastTen;
        }
    }
}