RabbitMQ Producer C# .NET Core 5.0 内存泄漏
RabbitMQ Producer C# .NET Core 5.0 Memory Leak
我想知道是否有人可以帮助解决以下情况:
我无法使用用 C# 编写并使用 .Net core 5.0 的 RabbitMQ Publisher 解决内存泄漏。
这是 csproj 文件:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<RuntimeIdentifier>win-x64</RuntimeIdentifier>
</PropertyGroup>
...
</Project>
我在虚拟机中有一个 .net 控制台应用程序 运行,它通过 API(注册的 64 位 dll 并作为 COM 引用引用)连接到服务器,从中获取信息该服务器然后尝试将此信息发布到位于 AWS 云上的 RabbitMQ 机器(具有此 RMQ 实例的多个节点的负载均衡器)。
访问代码中的API是通过以下方式完成的:
private void SetUpApi () {
Utils.log.Info (_api.SetAPIOptions ("<CONNECTIONOPTIONS><CALCULATED_PRICES Enabled='true' MaximumDepth='4'/></CONNECTIONOPTIONS>"));
_api.OnServerConnect += OnServerConnect;
_api.OnServerDisconnect += OnServerDisconnect;
_api.OnNewData += OnNewData;
}
private void OnNewData(string strXML){
try{
if (strXML.Contains("<ORDER")){
ParseXMLAnswer(strXML, "OnNewData ()");
}
}
catch (Exception ex) {
if (ex.InnerException is AlreadyClosedException || ex.InnerException is BrokerUnreachableException)
Utils.log.Error("OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException ");
else
Utils.printException("OnNewData ()", ex);
}
}
private void ParseXMLAnswer(string strOutputXML, string caller) {
XmlDocument doc = new XmlDocument();
doc.LoadXml(strOutputXML);
string jsonText = JsonConvert.SerializeXmlNode(doc);
var o = JObject.Parse(jsonText);
if (o["APIDATA"]["ORDER"].Type is JTokenType.Object){
JObject order = (JObject)o["APIDATA"]["ORDER"];
SendOrderToRMQ(order);
}
else if (o["APIDATA"]["ORDER"].Type is JTokenType.Array){
JArray orders = (JArray)o["APIDATA"]["ORDER"];
foreach (var item in orders.Children()){
SendOrderToRMQ((JObject)item);
}
}
doc = null;
GC.Collect();
GC.WaitForPendingFinalizers();
}
private void SendOrderToRMQ (JObject order){
JObject instrSpeciefier = (JObject) order["INSTSPECIFIER"];
var firstSeqID = instrSpeciefier.GetValue("@FirstSequenceID").ToString();
var firstSeqItemID = instrSpeciefier.GetValue("@FirstSequenceItemID").ToString();
if (sequenceItemsHashed.ContainsKey(firstSeqID) &&
sequenceItemsHashed[firstSeqID].Contains(firstSeqItemID)){
string itemName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@FirstSequenceItemName").ToString());
string instrumentName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@InstName").ToString());
int index = sequenceItemsHashed[firstSeqID].IndexOf(firstSeqItemID) + 1;
var binding = instrumentName + "." + sequencesFromInstruments[firstSeqID] + "." + itemName + "." + index;
serviceInstance1.Publish(
order.ToString(),
_exchangeName,
"",
binding);
}
order = null;
instrSpeciefier = null;
}
在高峰期 营业时间内,我从API 得到大约400 - 500 messages/second。这些消息以 XML 消息的形式出现。例如,一条消息可以包含多个订单,如下例所示。一个元素可以包含插入(创建)订单的操作,另一个元素可以包含删除特定订单的操作。
<?xml version="1.0" encoding="utf-16"?>
<APIDATA xmlns="api-com">
<ORDER EngineID="0" PersistentOrderID="2791" ...>
<INSTSPECIFIER InstID="287" ... />
...
</ORDER>
<ORDER EngineID="0" PersistentOrderID="9840" ...>
<INSTSPECIFIER InstID="288" ... />
...
</ORDER>
RabbitMQ 服务器已配置,我使用 SSL(带有证书和密钥)连接到它。
我使用RabbitMQ.Client v6.2.1 连接到RMQ。
交换、队列和绑定已经在 RabbitMQ 中定义。我的 Producer 应用程序只连接到它并开始发布。
我使用 同步 发布方法很重要,因为我们获取消息的顺序非常重要。例如,在一条消息中,我们得到一个创建订单的动作,而另一条消息紧随其后,告诉我们删除同一个订单。如果我使用 async 方法发布到 RMQ,我可能会在插入操作之前获得删除操作。
<?xml version="1.0" encoding="utf-16"?>
<APIDATA xmlns="api-com">
<ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Insert" ...>
...
</ORDER>
删除消息:
<?xml version="1.0" encoding="utf-16"?>
<APIDATA xmlns="api-com">
<ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Remove" ...>
...
</ORDER>
我使用以下方法发布到 RMQ:对象池(Microsoft 提供了一个名为 Microsoft.Extensions.ObjectPool 的包)- 此处描述的方法 - https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/ .
我在这里使用以下代码:
class RabbitManager : IRabbitManager
{
private readonly DefaultObjectPool<IModel> _objectPool;
public RabbitManager(IPooledObjectPolicy<IModel> objectPolicy){
_objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
}
public void Publish<T>(T message, string exchangeName, string exchangeType, string routeKey) where T : class {
if (message == null)
return;
var channel = _objectPool.Get();
try{
var sendBytes = Encoding.UTF8.GetBytes(message.ToString());
var properties = channel.CreateBasicProperties();
properties.ContentType = "application/json";
properties.DeliveryMode = 1; // Doesn't persist to disk
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
channel.BasicPublish(exchangeName, routeKey, properties, sendBytes);
}
catch (Exception ex) {
throw ex;
}
finally {
_objectPool.Return(channel);
}
}
}
public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>
{
private readonly RabbitOptions _options;
private readonly IConnection _connection;
public RabbitModelPooledObjectPolicy(RabbitOptions _options){
this._options = _options;
_connection = GetConnection();
}
private IConnection GetConnection() {
var factory = new ConnectionFactory() {
HostName = _options.HostName,
UserName = _options.UserName,
Password = _options.Password,
//Port = _options.Port,
VirtualHost = _options.VHost,
};
if (!String.IsNullOrEmpty(_options.CertPath))
{
factory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
factory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
factory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
factory.Ssl.ServerName = _options.HostName;
factory.Ssl.CertPath = _options.CertPath;
factory.Ssl.CertPassphrase = _options.CertPass;
factory.Ssl.Version = SslProtocols.Tls12;
factory.Ssl.Enabled = true;
}
factory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
factory.AutomaticRecoveryEnabled = true; // enable automatic connection recovery
factory.RequestedChannelMax = 32;
var _connection = factory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
return _connection;
}
private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
Utils.log.Info("Connection broke!");
}
private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors){
return true;
}
public IModel Create(){
return _connection.CreateModel();
}
public bool Return(IModel obj) {
if (obj.IsOpen) {
return true;
}
else {
obj?.Dispose();
return false;
}
}
}
下面是有问题的屏幕截图 - 内存不断增加:
这是在上面的屏幕截图之后拍摄的内存快照的堆栈跟踪:
截取上面的屏幕截图后,我的程序在控制台中收到以下错误消息:
26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348
26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348
这导致程序最终崩溃:
我做了什么来防止内存泄漏 :
- classes 启用了 IDisposable 接口
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
// free managed resources
_onTimePerHour.Dispose();
_api.OnNewData -= OnNewData;
}
// free native resources if there are any.
}
- 收到每条消息后强制进行垃圾回收:
private void ParseXMLAnswer(string strOutputXML, string caller) {
...
doc = null;
GC.Collect();
GC.WaitForPendingFinalizers();
}
这有点帮助,现在我的内存问题在较长时间后增加了。
- 我使用 Visual Studio 的 ReShaper (enter link description here) 插件来让我更好地理解内存问题的堆栈跟踪,但它并没有太大帮助。
我觉得是什么问题 :
RabbitMQ Producer 应用程序每秒获取许多消息,然后对这些消息进行解析,拆分为多个 JSON 消息并使用相同的通道发送到 RMQ。可能会出现以下情况:
- 我在一个 RMQ 频道上发布,不知何故我应该使用多个频道(一个连接但多个频道)
- 我收到的消息多于我可以使用 RabbitMQ.Client .net 库通过 RMQ 解析和发送的消息
- 我在内存中保留了一些未释放的对象(可能是消息)的引用;
以前有人遇到过这个问题吗?因为我在任何地方都找不到有关此“SingleProducerSingleConsumerQueue+Segment 内存不足 ”问题的任何信息。
有人知道如何更深入地分析这个问题吗?
非常感谢!
编辑 1
我想解决这个内存问题需要更多信息。
我有几个消费者使用来自 RabbitMQ 的数据(比如 NodeJS 和 python 应用程序)。因此,我需要以通用方式设计 RabbitMQ 生产者,因为每个消费者需要不同的数据。每次我有一个新的消费者应用程序时,我都无法修改和重新启动我的 RabbitMQ 生产者。所以我需要以通用的方式发布我的消息。
例如,每个消费者都有自己的专用队列,具有专用绑定。假设我有带有队列 cons1 和绑定的 consumer1 :
- marketName.productName.*.1(productName对应天数).
此绑定是动态的,目前它对应于星期一 (04.April),但明天它将对应于星期二 (05.April)。
所以我需要使用
将市场名称和产品名称存储在内存中
private static read-only Dictionary<string, List<string>> sequenceItemsHashed = new Dictionary<string, List<string>>();
private static readonly Dictionary<string, string> sequencesFromInstruments = new Dictionary<string, string>();
我提到在我的逻辑中 sequenceItemsHashed 对应于 marketNames 和 sequenceFromInstruments 对应于 productNames。
通过这种方式,我将所有消息发送到 RMQ,然后使用绑定在 RMQ 中进行排序。
编辑 2
据我了解,为了解决我的问题,我需要类似以下架构的东西 (enter link description here) :
所以在我到 RMQ 服务器的单个连接中有多个线程,每个线程一个通道。
编辑 3
已成功实施管道、ConcurrentQueue 和推送到 RMQ 的消费者线程,但仍然存在内存问题:
private readonly TransformBlock<string, string> orderFilter;
private readonly TransformBlock<string, JObject> xmlParser;
//private readonly TransformBlock<XmlDocument, JObject> xmlToJsonTransformer;
private readonly TransformManyBlock<JObject, JToken> jsonOrderFactory;
private readonly ActionBlock<JToken> messageSender;
ConcurrentQueue<JToken> concurrentQueue = new ConcurrentQueue<JToken>();
public Service1 (string [] args) {
...
// setup pipeline blocks
orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
xmlParser = new TransformBlock<string, JObject>(ParseXml);
jsonOrderFactory = new TransformManyBlock<JObject, JToken>(CreateOrderMessages);
messageSender = new ActionBlock<JToken>(SendMessage);
// build your pipeline
orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs
xmlParser.LinkTo(jsonOrderFactory);
jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });
Task t2 = Task.Factory.StartNew(() =>
{
while (true) {
if (!concurrentQueue.IsEmpty)
{
JToken number;
while (concurrentQueue.TryDequeue(out number))
{
_rabbitMQ.PublishMessages(
Encoding.ASCII.GetBytes(number.ToString()),
"test"
);
}
} else
{
Thread.Sleep(1);
}
}
});
...
}
private string FilterIncomingMessages(string strXml){
if (strXml.Contains("<ORDER")) return strXml;
return null;
}
private JObject ParseXml(string strXml){
XmlDocument doc = new XmlDocument();
doc.LoadXml(strXml);
string jsonText = JsonConvert.SerializeXmlNode(doc);
var o = JObject.Parse(jsonText);
return o;
}
private IEnumerable<JToken> CreateOrderMessages(JObject o){
List<JToken> myList = new List<JToken>();
if (o.ContainsKey("GV8APIDATA")){
if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Object){
JToken order = o["GV8APIDATA"]["ORDER"];
myList.Add(order);
}
else if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Array){
JToken orders = o["GV8APIDATA"]["ORDER"];
foreach (var order in orders.Children()){
myList.Add(order);
}
}
}
return myList.ToArray ();
}
private void SendMessage(JToken order){
concurrentQueue.Enqueue(order);
}
新的解决方案有助于将逻辑分成几个小部分,但我的内存仍然在不断增加。
编辑 4
考虑到@Fildor 的回答,我做了以下事情:
我没有将包含 xml 和 元素的字符串转换为 JSON,而是使用管道和下面的代码将 XML 反序列化为对象。
我删除了 Thread 和 ConcurrentQueue 的部分,我直接在最后一个 ActionBlock 中发布。
这解决了我的内存泄漏问题,但还有其他问题,例如:
- 如果消息足够大,我每秒只能打印大约 120 条消息。如果我只打印简单的字符串“test”,我得到 1780 messages/s 的比率。
public Service1 (string [] args) {
...
// setup pipeline blocks
orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
xmlParser = new TransformBlock<string, OrdersResponse>(ParseXml);
jsonOrderFactory = new TransformManyBlock<OrdersResponse, Order>(CreateOrderMessages);
messageSender = new ActionBlock<Order>(SendMessage);
// build your pipeline
orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs
xmlParser.LinkTo(jsonOrderFactory);
jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });
RunAsConsole(args);
}
private readonly TransformBlock<string, string> orderFilter;
private readonly TransformBlock<string, OrdersResponse> xmlParser;
private readonly TransformManyBlock<OrdersResponse, Order> jsonOrderFactory;
private readonly ActionBlock<Order> messageSender;
private void OnNewData(string strXML){
orderFilter.Post(strXML);
}
private string FilterIncomingMessages(string strXml){
if (strXml.Contains("<ORDER")) return strXml;
return null;
}
private OrdersResponse ParseXml(string strXml) {
var rootDataObj = DeserializeOrdersFromXML(strXml);
return rootDataObj;
}
private OrdersResponse DeserializeOrdersFromXML(string strOutputXML){
var xsExpirations = new XmlSerializer(typeof(OrdersResponse));
OrdersResponse rootDataObj = null;
using (TextReader reader = new StringReader(strOutputXML)) {
rootDataObj = (OrdersResponse)xsExpirations.Deserialize(reader);
reader.Close();
}
return rootDataObj;
}
private IEnumerable<Order> CreateOrderMessages(OrdersResponse o){
return o.orders;
}
private void SendMessage(Order order) {
_rabbitMQ.PublishMessages(
Encoding.ASCII.GetBytes(order.ToString()),
"test"
);
}
ORDER 对象看起来像:
[Serializable()]
[XmlRoot (ElementName = "ORDER")]
public class Order : IDisposable {
public void Dispose()
{
EngineID = null;
PersistentOrderID = null;
...
InstrumentSpecifier.Dispose();
InstrumentSpecifier = null;
GC.SuppressFinalize(this);
}
[XmlAttribute (AttributeName = "EngineID")]
public string EngineID { get; set; }
[XmlAttribute (AttributeName = "PersistentOrderID")]
public string PersistentOrderID { get; set; }
...
[XmlElement(ElementName = "INSTSPECIFIER")]
public InstrumentSpecifier InstrumentSpecifier { get; set; }
}
还有我的新 RabbitMQ class :
public class RMQ : IDisposable {
private IConnection _connection;
public IModel Channel { get; private set; }
private readonly ConnectionFactory _connectionFactory;
private readonly string _exchangeName;
public RMQ (RabbitOptions _rabbitOptions){
try{
// _connectionFactory initialization
_connectionFactory = new ConnectionFactory()
{
HostName = _rabbitOptions.HostName,
UserName = _rabbitOptions.UserName,
Password = _rabbitOptions.Password,
VirtualHost = _rabbitOptions.VHost,
};
this._exchangeName = _rabbitOptions.ExchangeName;
if (!String.IsNullOrEmpty(_rabbitOptions.CertPath)){
_connectionFactory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
_connectionFactory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
_connectionFactory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
_connectionFactory.Ssl.ServerName = _rabbitOptions.HostName;
_connectionFactory.Ssl.CertPath = _rabbitOptions.CertPath;
_connectionFactory.Ssl.CertPassphrase = _rabbitOptions.CertPass;
_connectionFactory.Ssl.Version = SslProtocols.Tls12;
_connectionFactory.Ssl.Enabled = true;
}
_connectionFactory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
_connectionFactory.AutomaticRecoveryEnabled = true; // enable automatic connection recovery
//_connectionFactory.RequestedChannelMax = 10;
if (_connection == null || _connection.IsOpen == false){
_connection = _connectionFactory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
}
if (Channel == null || Channel.IsOpen == false){
Channel = _connection.CreateModel();
}
Utils.log.Info("ConnectToRabbitMQ () Connecting to RabbitMQ. rabbitMQenvironment = ");
}
catch (Exception ex){
Utils.log.Error("Connection to RabbitMQ failed ! HostName = " + _rabbitOptions.HostName + " VirtualHost = " + _rabbitOptions.VHost);
Utils.printException("ConnectToRMQ ()", ex);
}
}
private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
Utils.log.Info ("Connection broke!");
try{
if (ReconnectToRMQ()){
Utils.log.Info("Connected!");
}
}
catch (Exception ex){
Utils.log.Info("Connect failed!" + ex.Message);
}
}
private bool ReconnectToRMQ(){
if (_connection == null || _connection.IsOpen == false){
_connection = _connectionFactory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
}
if (Channel == null || Channel.IsOpen == false){
Channel = _connection.CreateModel();
return true;
}
return false;
}
private bool ValidateServerCertificate (object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
return true;
}
public void DisconnectFromRMQ () {
Channel.Close ();
_connection.Close ();
}
public void Dispose(){
try{
Channel?.Close();
Channel?.Dispose();
Channel = null;
_connection?.Close();
_connection?.Dispose();
_connection = null;
}
catch (Exception e){
Utils.log.Error("Cannot dispose RabbitMQ channel or connection" + e.Message);
}
}
public void PublishMessages (byte [] message, string routingKey) {
if (this._connection == null || ! _connection.IsOpen) {
Utils.log.Error ("PublishMessages(), Connect failed! this.conn == null || !conn.IsOpen ");
ReconnectToRMQ();
} else {
var properties = Channel.CreateBasicProperties();
properties.Persistent = true;
Channel.BasicPublish (_exchangeName, routingKey, properties, message);
//serviceInstance1.Publish(message, _rabbitOptions.ExchangeName, "", routingKey);
}
}
}
现在有趣的是,如果我只将像“test”这样的小字符串发布到 RabbitMQ 到我的预定义队列中,我可以发布超过 1780 个 messages/second。
首先,您似乎阻塞了事件处理线程。
所以,我要做的是将事件处理与实际处理分离:
(未经测试!只是一个大纲!)
已删除错误代码
然后在 serviceInstance1
中,我会让 Publish
将订单排入 BlockingCollection 队列,专用线程正在其上等待。该线程将执行实际发送。因此,无论您在 Processor
中选择做什么,您都会将订单编组到该线程,并且所有订单都将解耦并按顺序排列。
您可能希望根据您的要求设置 BlockOptions。
请注意,这只是一个粗略的轮廓,不是完整的解决方案。您可能还想从那里开始并尽量减少字符串操作等。
编辑
从昨天开始我想到的一些想法没有特别的顺序:
- 放弃第一个过滤器以稍后过滤掉空的 JObject 集是否有益?
- 也许值得尝试使用 System.Text.Json instead of Newtonsoft?
- 有没有更有效的方法从 xml 到 json? (我在想“XSLT”,但真的不确定)
- 我建议使用 MemoryAnalyzer 安装 Benchmark.Net 来记录/证明您的更改具有积极影响。
- 不要忘记查看 DataFlowBockOptions 以调整管道的行为。
供参考:
- DataFlow
- Walkthrough: Creating a Dataflow Pipeline
- What's so wrong about using GC.Collect()?
回应问题中的编辑 3:
Task t2 = Task.Factory.StartNew(() =>
{
while (true) {
if (!concurrentQueue.IsEmpty)
{
JToken number;
while (concurrentQueue.TryDequeue(out number))
{
_rabbitMQ.PublishMessages(
Encoding.ASCII.GetBytes(number.ToString()),
"test"
);
}
} else
{
Thread.Sleep(1);
}
}
});
确实不是个好主意。
首先,我会在处理发送的服务 class 中执行此操作(serviceInstance1
- 不知道类型)。然后,在将 TPL 与 Thread.Sleep 混合时,您正在使用自旋等待进行紧密循环。那是 2 个 NoNos。这也完全打乱了阻塞队列的意图。即:线程阻塞,直到该队列上有一个项目可用。
也许现在完全放弃这部分是个更好的主意,让管道中的最后一个块执行 serviceInstance1.Publish
。这可能是过早的优化。
编辑 2
所以,昨天我做了一些实验,发现:
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml.Linq;
using System.Xml.Serialization;
using Newtonsoft.Json;
using System.Linq;
namespace DataFlowExperiment.PipelinesLib
{
public class PipelineOne
{
private readonly IPipelineOneSteps steps;
private readonly TransformBlock<string, XDocument> startBlock; // XML deserialize to Model
private readonly TransformManyBlock<XDocument, string> toJsonMessagesBlock; // jsons generieren.
private readonly ITargetBlock<string> resultCallback;
public PipelineOne(IPipelineOneSteps steps, ITargetBlock<string> resultCallback = null)
{
this.steps = steps;
startBlock = new TransformBlock<string, XDocument>(steps.Start);
toJsonMessagesBlock = new TransformManyBlock<XDocument, string>(steps.ToJson);
this.resultCallback = resultCallback ?? DataflowBlock.NullTarget<string>();
startBlock.LinkTo(toJsonMessagesBlock, new DataflowLinkOptions { PropagateCompletion = true });
toJsonMessagesBlock.LinkTo(this.resultCallback, new DataflowLinkOptions { PropagateCompletion = true }, x => !string.IsNullOrEmpty(x));
toJsonMessagesBlock.LinkTo(DataflowBlock.NullTarget<string>(), new DataflowLinkOptions { PropagateCompletion = true });
}
public void Post(string input)
{
startBlock.Post(input);
}
public Task Close()
{
startBlock.Complete();
return resultCallback.Completion;
}
}
public interface IPipelineOneSteps
{
public XDocument Start(string input);
public IEnumerable<string> ToJson(XDocument doc);
}
public class PipelineOneSteps : IPipelineOneSteps
{
private readonly JsonSerializer jsonSerializer;
public PipelineOneSteps()
{
jsonSerializer = JsonSerializer.CreateDefault();
}
public XDocument Start(string input)
{
XDocument doc = XDocument.Parse(input);
return doc;
}
public IEnumerable<string> ToJson(XDocument doc)
{
XNamespace ns = "api-com";
var orders = doc.Root.Elements(ns + "ORDER");
foreach (var order in orders)
{
yield return JsonConvert.SerializeXNode(order);
}
}
}
}
此基准测试的结果:
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19041.867 (2004/?/20H1)
Intel Core i9-10885H CPU 2.40GHz, 1 CPU, 16 logical and 8 physical cores
.NET Core SDK=5.0.202
[Host] : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT
DefaultJob : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT
Method
N
Mean
Error
StdDev
Gen 0
Gen 1
Gen 2
Allocated
PipeLineOneBenchmark
1000
25.00 μs
0.269 μs
0.252 μs
-
-
-
-
PipeLineOneBenchmark
100000
2,491.42 μs
13.655 μs
15.177 μs
-
-
-
-
这与您的解决方案相似但不同。
不过,这让我觉得,实际问题出在别处。 (仍在努力并准备更新。)
我在想一个小工具,看看你的兔子是否太慢了,你有大量积聚:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace DataFlowExperiment.PipelinesLib
{
public class RabbitWrapper : IDisposable
{
private readonly int batchSize = 10;
private Thread senderThread;
private readonly BlockingCollection<string> messages;
private readonly ActionBlock<string> receiver;
private readonly CancellationTokenSource stoppingToken;
private readonly RabbitWrapperStats stats;
private ITargetBlock<string> Receiver => receiver;
public RabbitWrapper()
{
// Drop in your logging here
stats = new RabbitWrapperStats(new Progress<string>(x => Console.WriteLine(x)));
stoppingToken = new CancellationTokenSource();
messages = new BlockingCollection<string>();
receiver = new ActionBlock<string>(Receive);
senderThread = new Thread(HandleQueue);
senderThread.Start();
}
private void Receive(string message)
{
messages.Add(message);
}
private void HandleQueue()
{
while (!stoppingToken.Token.IsCancellationRequested)
{
int batchIndex = 0;
do {
string message = messages.Take(stoppingToken.Token);
if (!string.IsNullOrEmpty(message))
{
SendToRabbit(message);
}
batchIndex++;
} while (!stoppingToken.Token.IsCancellationRequested &&
batchIndex < batchSize &&
messages.Count > 0);
// Check statistics every 10 messages.
CheckStats(messages.Count);
}
}
private void SendToRabbit(string message)
{
// rabbit Publish goes here.
}
private void CheckStats(int count)
{
stats.CheckStats(count);
}
public void Close()
{
this.stoppingToken.Cancel();
senderThread.Join();
}
public void Dispose()
{
Close();
}
}
internal class RabbitWrapperStats
{
// You may want to play around with these thresholds
// I pulled them out of thin air ...
const int SIZE_WARN = 500000;
const int SIZE_CRITICAL = SIZE_WARN * 2;
private int lastTenIndex = 0;
private int[] lastTen = new int[10] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
private int meanSizeLastTen = 0;
private int lastMeanSize = 0;
private int tendency = 0;
private bool HasWarned = false;
private bool HasPanicked = false;
private readonly IProgress<string> progress;
public RabbitWrapperStats(IProgress<string> progress)
{
this.progress = progress;
}
public void CheckStats(int queueSize)
{
UpdateLastTen(queueSize);
if (!HasPanicked && queueSize > SIZE_CRITICAL)
{
Panic(queueSize);
return;
}
if (!HasWarned && queueSize > SIZE_WARN)
{
Warn(queueSize);
return;
}
if ((HasPanicked || HasWarned ) && meanSizeLastTen < SIZE_WARN * 0.75)
{
HasPanicked = false;
HasWarned = false;
progress?.Report($"INFO Mean size of last 10 Samples sinks below {SIZE_WARN * 0.75} : {meanSizeLastTen}");
}
}
private void Warn(int size)
{
HasWarned = true;
progress?.Report($"WARNING QueueSize = {size}");
}
private void Panic(int size)
{
HasPanicked = true;
progress?.Report($"!! CRITICAL !! QueueSize = {size}");
}
private void UpdateLastTen(int value)
{
lastTen[lastTenIndex] = value;
lastTenIndex = ++lastTenIndex % lastTen.Length;
meanSizeLastTen = lastTen.Sum() / lastTen.Length;
tendency = meanSizeLastTen.CompareTo(lastMeanSize);
lastMeanSize = meanSizeLastTen;
}
}
}
我想知道是否有人可以帮助解决以下情况:
我无法使用用 C# 编写并使用 .Net core 5.0 的 RabbitMQ Publisher 解决内存泄漏。
这是 csproj 文件:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<RuntimeIdentifier>win-x64</RuntimeIdentifier>
</PropertyGroup>
...
</Project>
我在虚拟机中有一个 .net 控制台应用程序 运行,它通过 API(注册的 64 位 dll 并作为 COM 引用引用)连接到服务器,从中获取信息该服务器然后尝试将此信息发布到位于 AWS 云上的 RabbitMQ 机器(具有此 RMQ 实例的多个节点的负载均衡器)。
访问代码中的API是通过以下方式完成的:
private void SetUpApi () {
Utils.log.Info (_api.SetAPIOptions ("<CONNECTIONOPTIONS><CALCULATED_PRICES Enabled='true' MaximumDepth='4'/></CONNECTIONOPTIONS>"));
_api.OnServerConnect += OnServerConnect;
_api.OnServerDisconnect += OnServerDisconnect;
_api.OnNewData += OnNewData;
}
private void OnNewData(string strXML){
try{
if (strXML.Contains("<ORDER")){
ParseXMLAnswer(strXML, "OnNewData ()");
}
}
catch (Exception ex) {
if (ex.InnerException is AlreadyClosedException || ex.InnerException is BrokerUnreachableException)
Utils.log.Error("OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException ");
else
Utils.printException("OnNewData ()", ex);
}
}
private void ParseXMLAnswer(string strOutputXML, string caller) {
XmlDocument doc = new XmlDocument();
doc.LoadXml(strOutputXML);
string jsonText = JsonConvert.SerializeXmlNode(doc);
var o = JObject.Parse(jsonText);
if (o["APIDATA"]["ORDER"].Type is JTokenType.Object){
JObject order = (JObject)o["APIDATA"]["ORDER"];
SendOrderToRMQ(order);
}
else if (o["APIDATA"]["ORDER"].Type is JTokenType.Array){
JArray orders = (JArray)o["APIDATA"]["ORDER"];
foreach (var item in orders.Children()){
SendOrderToRMQ((JObject)item);
}
}
doc = null;
GC.Collect();
GC.WaitForPendingFinalizers();
}
private void SendOrderToRMQ (JObject order){
JObject instrSpeciefier = (JObject) order["INSTSPECIFIER"];
var firstSeqID = instrSpeciefier.GetValue("@FirstSequenceID").ToString();
var firstSeqItemID = instrSpeciefier.GetValue("@FirstSequenceItemID").ToString();
if (sequenceItemsHashed.ContainsKey(firstSeqID) &&
sequenceItemsHashed[firstSeqID].Contains(firstSeqItemID)){
string itemName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@FirstSequenceItemName").ToString());
string instrumentName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@InstName").ToString());
int index = sequenceItemsHashed[firstSeqID].IndexOf(firstSeqItemID) + 1;
var binding = instrumentName + "." + sequencesFromInstruments[firstSeqID] + "." + itemName + "." + index;
serviceInstance1.Publish(
order.ToString(),
_exchangeName,
"",
binding);
}
order = null;
instrSpeciefier = null;
}
在高峰期 营业时间内,我从API 得到大约400 - 500 messages/second。这些消息以 XML 消息的形式出现。例如,一条消息可以包含多个订单,如下例所示。一个元素可以包含插入(创建)订单的操作,另一个元素可以包含删除特定订单的操作。
<?xml version="1.0" encoding="utf-16"?>
<APIDATA xmlns="api-com">
<ORDER EngineID="0" PersistentOrderID="2791" ...>
<INSTSPECIFIER InstID="287" ... />
...
</ORDER>
<ORDER EngineID="0" PersistentOrderID="9840" ...>
<INSTSPECIFIER InstID="288" ... />
...
</ORDER>
RabbitMQ 服务器已配置,我使用 SSL(带有证书和密钥)连接到它。 我使用RabbitMQ.Client v6.2.1 连接到RMQ。 交换、队列和绑定已经在 RabbitMQ 中定义。我的 Producer 应用程序只连接到它并开始发布。
我使用 同步 发布方法很重要,因为我们获取消息的顺序非常重要。例如,在一条消息中,我们得到一个创建订单的动作,而另一条消息紧随其后,告诉我们删除同一个订单。如果我使用 async 方法发布到 RMQ,我可能会在插入操作之前获得删除操作。
<?xml version="1.0" encoding="utf-16"?>
<APIDATA xmlns="api-com">
<ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Insert" ...>
...
</ORDER>
删除消息:
<?xml version="1.0" encoding="utf-16"?>
<APIDATA xmlns="api-com">
<ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Remove" ...>
...
</ORDER>
我使用以下方法发布到 RMQ:对象池(Microsoft 提供了一个名为 Microsoft.Extensions.ObjectPool 的包)- 此处描述的方法 - https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/ .
我在这里使用以下代码:
class RabbitManager : IRabbitManager
{
private readonly DefaultObjectPool<IModel> _objectPool;
public RabbitManager(IPooledObjectPolicy<IModel> objectPolicy){
_objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
}
public void Publish<T>(T message, string exchangeName, string exchangeType, string routeKey) where T : class {
if (message == null)
return;
var channel = _objectPool.Get();
try{
var sendBytes = Encoding.UTF8.GetBytes(message.ToString());
var properties = channel.CreateBasicProperties();
properties.ContentType = "application/json";
properties.DeliveryMode = 1; // Doesn't persist to disk
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
channel.BasicPublish(exchangeName, routeKey, properties, sendBytes);
}
catch (Exception ex) {
throw ex;
}
finally {
_objectPool.Return(channel);
}
}
}
public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>
{
private readonly RabbitOptions _options;
private readonly IConnection _connection;
public RabbitModelPooledObjectPolicy(RabbitOptions _options){
this._options = _options;
_connection = GetConnection();
}
private IConnection GetConnection() {
var factory = new ConnectionFactory() {
HostName = _options.HostName,
UserName = _options.UserName,
Password = _options.Password,
//Port = _options.Port,
VirtualHost = _options.VHost,
};
if (!String.IsNullOrEmpty(_options.CertPath))
{
factory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
factory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
factory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
factory.Ssl.ServerName = _options.HostName;
factory.Ssl.CertPath = _options.CertPath;
factory.Ssl.CertPassphrase = _options.CertPass;
factory.Ssl.Version = SslProtocols.Tls12;
factory.Ssl.Enabled = true;
}
factory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
factory.AutomaticRecoveryEnabled = true; // enable automatic connection recovery
factory.RequestedChannelMax = 32;
var _connection = factory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
return _connection;
}
private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
Utils.log.Info("Connection broke!");
}
private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors){
return true;
}
public IModel Create(){
return _connection.CreateModel();
}
public bool Return(IModel obj) {
if (obj.IsOpen) {
return true;
}
else {
obj?.Dispose();
return false;
}
}
}
下面是有问题的屏幕截图 - 内存不断增加:
这是在上面的屏幕截图之后拍摄的内存快照的堆栈跟踪:
截取上面的屏幕截图后,我的程序在控制台中收到以下错误消息:
26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348
26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348
这导致程序最终崩溃:
我做了什么来防止内存泄漏 :
- classes 启用了 IDisposable 接口
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
// free managed resources
_onTimePerHour.Dispose();
_api.OnNewData -= OnNewData;
}
// free native resources if there are any.
}
- 收到每条消息后强制进行垃圾回收:
private void ParseXMLAnswer(string strOutputXML, string caller) {
...
doc = null;
GC.Collect();
GC.WaitForPendingFinalizers();
}
这有点帮助,现在我的内存问题在较长时间后增加了。
- 我使用 Visual Studio 的 ReShaper (enter link description here) 插件来让我更好地理解内存问题的堆栈跟踪,但它并没有太大帮助。
我觉得是什么问题 :
RabbitMQ Producer 应用程序每秒获取许多消息,然后对这些消息进行解析,拆分为多个 JSON 消息并使用相同的通道发送到 RMQ。可能会出现以下情况:
- 我在一个 RMQ 频道上发布,不知何故我应该使用多个频道(一个连接但多个频道)
- 我收到的消息多于我可以使用 RabbitMQ.Client .net 库通过 RMQ 解析和发送的消息
- 我在内存中保留了一些未释放的对象(可能是消息)的引用;
以前有人遇到过这个问题吗?因为我在任何地方都找不到有关此“SingleProducerSingleConsumerQueue+Segment
有人知道如何更深入地分析这个问题吗?
非常感谢!
编辑 1
我想解决这个内存问题需要更多信息。
我有几个消费者使用来自 RabbitMQ 的数据(比如 NodeJS 和 python 应用程序)。因此,我需要以通用方式设计 RabbitMQ 生产者,因为每个消费者需要不同的数据。每次我有一个新的消费者应用程序时,我都无法修改和重新启动我的 RabbitMQ 生产者。所以我需要以通用的方式发布我的消息。
例如,每个消费者都有自己的专用队列,具有专用绑定。假设我有带有队列 cons1 和绑定的 consumer1 :
- marketName.productName.*.1(productName对应天数).
此绑定是动态的,目前它对应于星期一 (04.April),但明天它将对应于星期二 (05.April)。
所以我需要使用
将市场名称和产品名称存储在内存中private static read-only Dictionary<string, List<string>> sequenceItemsHashed = new Dictionary<string, List<string>>();
private static readonly Dictionary<string, string> sequencesFromInstruments = new Dictionary<string, string>();
我提到在我的逻辑中 sequenceItemsHashed 对应于 marketNames 和 sequenceFromInstruments 对应于 productNames。
通过这种方式,我将所有消息发送到 RMQ,然后使用绑定在 RMQ 中进行排序。
编辑 2
据我了解,为了解决我的问题,我需要类似以下架构的东西 (enter link description here) :
所以在我到 RMQ 服务器的单个连接中有多个线程,每个线程一个通道。
编辑 3
已成功实施管道、ConcurrentQueue 和推送到 RMQ 的消费者线程,但仍然存在内存问题:
private readonly TransformBlock<string, string> orderFilter;
private readonly TransformBlock<string, JObject> xmlParser;
//private readonly TransformBlock<XmlDocument, JObject> xmlToJsonTransformer;
private readonly TransformManyBlock<JObject, JToken> jsonOrderFactory;
private readonly ActionBlock<JToken> messageSender;
ConcurrentQueue<JToken> concurrentQueue = new ConcurrentQueue<JToken>();
public Service1 (string [] args) {
...
// setup pipeline blocks
orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
xmlParser = new TransformBlock<string, JObject>(ParseXml);
jsonOrderFactory = new TransformManyBlock<JObject, JToken>(CreateOrderMessages);
messageSender = new ActionBlock<JToken>(SendMessage);
// build your pipeline
orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs
xmlParser.LinkTo(jsonOrderFactory);
jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });
Task t2 = Task.Factory.StartNew(() =>
{
while (true) {
if (!concurrentQueue.IsEmpty)
{
JToken number;
while (concurrentQueue.TryDequeue(out number))
{
_rabbitMQ.PublishMessages(
Encoding.ASCII.GetBytes(number.ToString()),
"test"
);
}
} else
{
Thread.Sleep(1);
}
}
});
...
}
private string FilterIncomingMessages(string strXml){
if (strXml.Contains("<ORDER")) return strXml;
return null;
}
private JObject ParseXml(string strXml){
XmlDocument doc = new XmlDocument();
doc.LoadXml(strXml);
string jsonText = JsonConvert.SerializeXmlNode(doc);
var o = JObject.Parse(jsonText);
return o;
}
private IEnumerable<JToken> CreateOrderMessages(JObject o){
List<JToken> myList = new List<JToken>();
if (o.ContainsKey("GV8APIDATA")){
if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Object){
JToken order = o["GV8APIDATA"]["ORDER"];
myList.Add(order);
}
else if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Array){
JToken orders = o["GV8APIDATA"]["ORDER"];
foreach (var order in orders.Children()){
myList.Add(order);
}
}
}
return myList.ToArray ();
}
private void SendMessage(JToken order){
concurrentQueue.Enqueue(order);
}
新的解决方案有助于将逻辑分成几个小部分,但我的内存仍然在不断增加。
编辑 4
考虑到@Fildor 的回答,我做了以下事情:
我没有将包含 xml 和
我删除了 Thread 和 ConcurrentQueue 的部分,我直接在最后一个 ActionBlock 中发布。
这解决了我的内存泄漏问题,但还有其他问题,例如:
- 如果消息足够大,我每秒只能打印大约 120 条消息。如果我只打印简单的字符串“test”,我得到 1780 messages/s 的比率。
public Service1 (string [] args) {
...
// setup pipeline blocks
orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
xmlParser = new TransformBlock<string, OrdersResponse>(ParseXml);
jsonOrderFactory = new TransformManyBlock<OrdersResponse, Order>(CreateOrderMessages);
messageSender = new ActionBlock<Order>(SendMessage);
// build your pipeline
orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs
xmlParser.LinkTo(jsonOrderFactory);
jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });
RunAsConsole(args);
}
private readonly TransformBlock<string, string> orderFilter;
private readonly TransformBlock<string, OrdersResponse> xmlParser;
private readonly TransformManyBlock<OrdersResponse, Order> jsonOrderFactory;
private readonly ActionBlock<Order> messageSender;
private void OnNewData(string strXML){
orderFilter.Post(strXML);
}
private string FilterIncomingMessages(string strXml){
if (strXml.Contains("<ORDER")) return strXml;
return null;
}
private OrdersResponse ParseXml(string strXml) {
var rootDataObj = DeserializeOrdersFromXML(strXml);
return rootDataObj;
}
private OrdersResponse DeserializeOrdersFromXML(string strOutputXML){
var xsExpirations = new XmlSerializer(typeof(OrdersResponse));
OrdersResponse rootDataObj = null;
using (TextReader reader = new StringReader(strOutputXML)) {
rootDataObj = (OrdersResponse)xsExpirations.Deserialize(reader);
reader.Close();
}
return rootDataObj;
}
private IEnumerable<Order> CreateOrderMessages(OrdersResponse o){
return o.orders;
}
private void SendMessage(Order order) {
_rabbitMQ.PublishMessages(
Encoding.ASCII.GetBytes(order.ToString()),
"test"
);
}
ORDER 对象看起来像:
[Serializable()]
[XmlRoot (ElementName = "ORDER")]
public class Order : IDisposable {
public void Dispose()
{
EngineID = null;
PersistentOrderID = null;
...
InstrumentSpecifier.Dispose();
InstrumentSpecifier = null;
GC.SuppressFinalize(this);
}
[XmlAttribute (AttributeName = "EngineID")]
public string EngineID { get; set; }
[XmlAttribute (AttributeName = "PersistentOrderID")]
public string PersistentOrderID { get; set; }
...
[XmlElement(ElementName = "INSTSPECIFIER")]
public InstrumentSpecifier InstrumentSpecifier { get; set; }
}
还有我的新 RabbitMQ class :
public class RMQ : IDisposable {
private IConnection _connection;
public IModel Channel { get; private set; }
private readonly ConnectionFactory _connectionFactory;
private readonly string _exchangeName;
public RMQ (RabbitOptions _rabbitOptions){
try{
// _connectionFactory initialization
_connectionFactory = new ConnectionFactory()
{
HostName = _rabbitOptions.HostName,
UserName = _rabbitOptions.UserName,
Password = _rabbitOptions.Password,
VirtualHost = _rabbitOptions.VHost,
};
this._exchangeName = _rabbitOptions.ExchangeName;
if (!String.IsNullOrEmpty(_rabbitOptions.CertPath)){
_connectionFactory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
_connectionFactory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
_connectionFactory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
_connectionFactory.Ssl.ServerName = _rabbitOptions.HostName;
_connectionFactory.Ssl.CertPath = _rabbitOptions.CertPath;
_connectionFactory.Ssl.CertPassphrase = _rabbitOptions.CertPass;
_connectionFactory.Ssl.Version = SslProtocols.Tls12;
_connectionFactory.Ssl.Enabled = true;
}
_connectionFactory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
_connectionFactory.AutomaticRecoveryEnabled = true; // enable automatic connection recovery
//_connectionFactory.RequestedChannelMax = 10;
if (_connection == null || _connection.IsOpen == false){
_connection = _connectionFactory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
}
if (Channel == null || Channel.IsOpen == false){
Channel = _connection.CreateModel();
}
Utils.log.Info("ConnectToRabbitMQ () Connecting to RabbitMQ. rabbitMQenvironment = ");
}
catch (Exception ex){
Utils.log.Error("Connection to RabbitMQ failed ! HostName = " + _rabbitOptions.HostName + " VirtualHost = " + _rabbitOptions.VHost);
Utils.printException("ConnectToRMQ ()", ex);
}
}
private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
Utils.log.Info ("Connection broke!");
try{
if (ReconnectToRMQ()){
Utils.log.Info("Connected!");
}
}
catch (Exception ex){
Utils.log.Info("Connect failed!" + ex.Message);
}
}
private bool ReconnectToRMQ(){
if (_connection == null || _connection.IsOpen == false){
_connection = _connectionFactory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
}
if (Channel == null || Channel.IsOpen == false){
Channel = _connection.CreateModel();
return true;
}
return false;
}
private bool ValidateServerCertificate (object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
return true;
}
public void DisconnectFromRMQ () {
Channel.Close ();
_connection.Close ();
}
public void Dispose(){
try{
Channel?.Close();
Channel?.Dispose();
Channel = null;
_connection?.Close();
_connection?.Dispose();
_connection = null;
}
catch (Exception e){
Utils.log.Error("Cannot dispose RabbitMQ channel or connection" + e.Message);
}
}
public void PublishMessages (byte [] message, string routingKey) {
if (this._connection == null || ! _connection.IsOpen) {
Utils.log.Error ("PublishMessages(), Connect failed! this.conn == null || !conn.IsOpen ");
ReconnectToRMQ();
} else {
var properties = Channel.CreateBasicProperties();
properties.Persistent = true;
Channel.BasicPublish (_exchangeName, routingKey, properties, message);
//serviceInstance1.Publish(message, _rabbitOptions.ExchangeName, "", routingKey);
}
}
}
现在有趣的是,如果我只将像“test”这样的小字符串发布到 RabbitMQ 到我的预定义队列中,我可以发布超过 1780 个 messages/second。
首先,您似乎阻塞了事件处理线程。 所以,我要做的是将事件处理与实际处理分离:
(未经测试!只是一个大纲!)
已删除错误代码
然后在 serviceInstance1
中,我会让 Publish
将订单排入 BlockingCollection 队列,专用线程正在其上等待。该线程将执行实际发送。因此,无论您在 Processor
中选择做什么,您都会将订单编组到该线程,并且所有订单都将解耦并按顺序排列。
您可能希望根据您的要求设置 BlockOptions。
请注意,这只是一个粗略的轮廓,不是完整的解决方案。您可能还想从那里开始并尽量减少字符串操作等。
编辑
从昨天开始我想到的一些想法没有特别的顺序:
- 放弃第一个过滤器以稍后过滤掉空的 JObject 集是否有益?
- 也许值得尝试使用 System.Text.Json instead of Newtonsoft?
- 有没有更有效的方法从 xml 到 json? (我在想“XSLT”,但真的不确定)
- 我建议使用 MemoryAnalyzer 安装 Benchmark.Net 来记录/证明您的更改具有积极影响。
- 不要忘记查看 DataFlowBockOptions 以调整管道的行为。
供参考:
- DataFlow
- Walkthrough: Creating a Dataflow Pipeline
- What's so wrong about using GC.Collect()?
回应问题中的编辑 3:
Task t2 = Task.Factory.StartNew(() =>
{
while (true) {
if (!concurrentQueue.IsEmpty)
{
JToken number;
while (concurrentQueue.TryDequeue(out number))
{
_rabbitMQ.PublishMessages(
Encoding.ASCII.GetBytes(number.ToString()),
"test"
);
}
} else
{
Thread.Sleep(1);
}
}
});
确实不是个好主意。
首先,我会在处理发送的服务 class 中执行此操作(serviceInstance1
- 不知道类型)。然后,在将 TPL 与 Thread.Sleep 混合时,您正在使用自旋等待进行紧密循环。那是 2 个 NoNos。这也完全打乱了阻塞队列的意图。即:线程阻塞,直到该队列上有一个项目可用。
也许现在完全放弃这部分是个更好的主意,让管道中的最后一个块执行 serviceInstance1.Publish
。这可能是过早的优化。
编辑 2
所以,昨天我做了一些实验,发现:
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml.Linq;
using System.Xml.Serialization;
using Newtonsoft.Json;
using System.Linq;
namespace DataFlowExperiment.PipelinesLib
{
public class PipelineOne
{
private readonly IPipelineOneSteps steps;
private readonly TransformBlock<string, XDocument> startBlock; // XML deserialize to Model
private readonly TransformManyBlock<XDocument, string> toJsonMessagesBlock; // jsons generieren.
private readonly ITargetBlock<string> resultCallback;
public PipelineOne(IPipelineOneSteps steps, ITargetBlock<string> resultCallback = null)
{
this.steps = steps;
startBlock = new TransformBlock<string, XDocument>(steps.Start);
toJsonMessagesBlock = new TransformManyBlock<XDocument, string>(steps.ToJson);
this.resultCallback = resultCallback ?? DataflowBlock.NullTarget<string>();
startBlock.LinkTo(toJsonMessagesBlock, new DataflowLinkOptions { PropagateCompletion = true });
toJsonMessagesBlock.LinkTo(this.resultCallback, new DataflowLinkOptions { PropagateCompletion = true }, x => !string.IsNullOrEmpty(x));
toJsonMessagesBlock.LinkTo(DataflowBlock.NullTarget<string>(), new DataflowLinkOptions { PropagateCompletion = true });
}
public void Post(string input)
{
startBlock.Post(input);
}
public Task Close()
{
startBlock.Complete();
return resultCallback.Completion;
}
}
public interface IPipelineOneSteps
{
public XDocument Start(string input);
public IEnumerable<string> ToJson(XDocument doc);
}
public class PipelineOneSteps : IPipelineOneSteps
{
private readonly JsonSerializer jsonSerializer;
public PipelineOneSteps()
{
jsonSerializer = JsonSerializer.CreateDefault();
}
public XDocument Start(string input)
{
XDocument doc = XDocument.Parse(input);
return doc;
}
public IEnumerable<string> ToJson(XDocument doc)
{
XNamespace ns = "api-com";
var orders = doc.Root.Elements(ns + "ORDER");
foreach (var order in orders)
{
yield return JsonConvert.SerializeXNode(order);
}
}
}
}
此基准测试的结果:
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19041.867 (2004/?/20H1)
Intel Core i9-10885H CPU 2.40GHz, 1 CPU, 16 logical and 8 physical cores
.NET Core SDK=5.0.202
[Host] : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT
DefaultJob : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT
Method | N | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|
PipeLineOneBenchmark | 1000 | 25.00 μs | 0.269 μs | 0.252 μs | - | - | - | - |
PipeLineOneBenchmark | 100000 | 2,491.42 μs | 13.655 μs | 15.177 μs | - | - | - | - |
这与您的解决方案相似但不同。
不过,这让我觉得,实际问题出在别处。 (仍在努力并准备更新。)
我在想一个小工具,看看你的兔子是否太慢了,你有大量积聚:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace DataFlowExperiment.PipelinesLib
{
public class RabbitWrapper : IDisposable
{
private readonly int batchSize = 10;
private Thread senderThread;
private readonly BlockingCollection<string> messages;
private readonly ActionBlock<string> receiver;
private readonly CancellationTokenSource stoppingToken;
private readonly RabbitWrapperStats stats;
private ITargetBlock<string> Receiver => receiver;
public RabbitWrapper()
{
// Drop in your logging here
stats = new RabbitWrapperStats(new Progress<string>(x => Console.WriteLine(x)));
stoppingToken = new CancellationTokenSource();
messages = new BlockingCollection<string>();
receiver = new ActionBlock<string>(Receive);
senderThread = new Thread(HandleQueue);
senderThread.Start();
}
private void Receive(string message)
{
messages.Add(message);
}
private void HandleQueue()
{
while (!stoppingToken.Token.IsCancellationRequested)
{
int batchIndex = 0;
do {
string message = messages.Take(stoppingToken.Token);
if (!string.IsNullOrEmpty(message))
{
SendToRabbit(message);
}
batchIndex++;
} while (!stoppingToken.Token.IsCancellationRequested &&
batchIndex < batchSize &&
messages.Count > 0);
// Check statistics every 10 messages.
CheckStats(messages.Count);
}
}
private void SendToRabbit(string message)
{
// rabbit Publish goes here.
}
private void CheckStats(int count)
{
stats.CheckStats(count);
}
public void Close()
{
this.stoppingToken.Cancel();
senderThread.Join();
}
public void Dispose()
{
Close();
}
}
internal class RabbitWrapperStats
{
// You may want to play around with these thresholds
// I pulled them out of thin air ...
const int SIZE_WARN = 500000;
const int SIZE_CRITICAL = SIZE_WARN * 2;
private int lastTenIndex = 0;
private int[] lastTen = new int[10] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
private int meanSizeLastTen = 0;
private int lastMeanSize = 0;
private int tendency = 0;
private bool HasWarned = false;
private bool HasPanicked = false;
private readonly IProgress<string> progress;
public RabbitWrapperStats(IProgress<string> progress)
{
this.progress = progress;
}
public void CheckStats(int queueSize)
{
UpdateLastTen(queueSize);
if (!HasPanicked && queueSize > SIZE_CRITICAL)
{
Panic(queueSize);
return;
}
if (!HasWarned && queueSize > SIZE_WARN)
{
Warn(queueSize);
return;
}
if ((HasPanicked || HasWarned ) && meanSizeLastTen < SIZE_WARN * 0.75)
{
HasPanicked = false;
HasWarned = false;
progress?.Report($"INFO Mean size of last 10 Samples sinks below {SIZE_WARN * 0.75} : {meanSizeLastTen}");
}
}
private void Warn(int size)
{
HasWarned = true;
progress?.Report($"WARNING QueueSize = {size}");
}
private void Panic(int size)
{
HasPanicked = true;
progress?.Report($"!! CRITICAL !! QueueSize = {size}");
}
private void UpdateLastTen(int value)
{
lastTen[lastTenIndex] = value;
lastTenIndex = ++lastTenIndex % lastTen.Length;
meanSizeLastTen = lastTen.Sum() / lastTen.Length;
tendency = meanSizeLastTen.CompareTo(lastMeanSize);
lastMeanSize = meanSizeLastTen;
}
}
}