Rx 缓冲区内存泄漏
Rx Buffer Memory Leak
我有一个关于 Rx Buffer
运算符的奇怪问题,我找不到合适的解决方案,而且我不知道我做错了什么。如果第 9 行的 Buffer
没有使用 EventLoopScheduler
它会在一段时间后开始泄漏内存,因为没有从 (item
)?
推送任何项目
第 1 行的 item
是一个 IObservable<Entity>
,它将从下游的 TCP 套接字中检索到的解析数据推送。使 Buffer
使用 EventLoopScheduler
可以解决问题,但会降低整体系统性能。
如何在不必强制 Buffer
运算符使用 EventLoopScheduler
的情况下解决此内存泄漏问题?
var groupedItems = items
.GroupBy(entity => entity._type)
.Select(o => new {Type = o.Key, Categories = o.GroupBy(entity => entity._key)});
var ev = new EventLoopScheduler();
var collections = from item in groupedItems
from category in item.Categories
from entities in category.Buffer(intervalTime, intervalSize, /* ev */)
where entities.Any()
select new LogCollection(item.Type, category.Key, entities);
collections.Buffer(TimeSpan.FromSeconds(1)).Where(o => o.Any()).Subscribe(Insert);
更新:
经过一些调查,Buffer
运算符似乎不是问题,而是 "solves" 在 EventLoopScheduler
上安排时的问题。在绝望中,我发布了关键代码片段,因为我对 Rx 还很陌生,我不知道我是否正确使用了范式——所以如果我误用了它,请纠正我! :)
背景知识:应用程序通过 TCP 套接字检索二进制数据,并在进行一些转换后将其插入数据库。
收到
客户端可以连接到服务器,客户端发送的数据将被转换。如果约定发生任何异常,它将捕获异常并断开客户端。
public IObservable<LogEntity> StartListening(IDataConverter converter)
{
return Observable.Create<LogEntity>(observer =>
{
return _endPoint.ToListenerObservable(_backlog).Subscribe(client =>
{
var stream = client.ToClientObservable(_bufferSize, _waitHandle);
converter.Convert(stream)
.Catch<LogEntity, Exception>(exception =>
{
client.Close(); // dc client
return Observable.Empty<LogEntity>();
})
.Subscribe(observer.OnNext);
});
});
}
下面是负责读取发送到服务器的数据的代码。 WaitHandle
是 EventWaitHandle
的包装器,如果数据库离线,将被阻塞以避免数据在系统中累积。 (当 WaitHandle
阻塞且未检索任何数据时出现问题)
public static IObservable<ArraySegment<byte>> ToClientObservable(this TcpClient client, int size, WaitHandle waitHandle)
{
return client.GetStream().ToStreamObservable(size, waitHandle);
}
public static IObservable<ArraySegment<byte>> ToStreamObservable(this Stream stream, int size, WaitHandle waitHandle)
{
return Observable.Create<ArraySegment<byte>>(async (observer, token) =>
{
var buffer = new byte[size];
try
{
while (!token.IsCancellationRequested)
{
waitHandle.BlockingWait();
var received = await stream.ReadAsync(buffer, 0, size, token);
if (received == 0) break;
observer.OnNext(new ArraySegment<byte>(buffer, 0, received));
}
observer.OnCompleted();
}
catch (Exception error)
{
observer.OnError(error);
}
});
}
转换器
转换器使用 Scan
运算符来解析数据流。它内部可能会发生异常。目前,异常将传播到 StartListing
方法,发送错误数据的客户端将断开连接。
public IObservable<LogMessage> Convert(IObservable<ArraySegment<byte>> bytes)
{
return bytes.Scan(
new
{
Leftovers = new byte[0],
Logs = new List<LogMessage>(),
},
(saved, current) =>
{
// Parse bytes
// Exception here if invalid data retrieved
return new
{
Leftovers = data.ToArray(),
Logs = logs,
};
})
.SelectMany(o => o.Logs);
}
你们能看到任何可能导致内存泄漏的东西吗?这基本上是所有负责检索数据、在将其发送到转换阶段之前对其进行转换的代码(第一个问题)。此外,我使用 dotMemory 工具确认了内存泄漏。
您的示例代码中有一些值得注意的地方。
首先,它不是@Enigmativity 指出的 MVCE,例如items
是什么类型、它的值、它们的属性/(字段?),以及 LogCollection
.
的类型
其次,您似乎 运行 进行了过多的 GroupBy
操作。这创建了可观察序列的 3 层嵌套。我认为你只想 GroupBy
一次并依靠匿名打字为你做正确的事情,即 .GroupBy(entity => new { entity.Type, entity.Key})
。我这样说是因为一旦你按两次分组,你似乎只是再次打开它。
第三,你缓冲两次。两次检查空缓冲区。一次使用调度程序(也许)而另一个不使用?第二个缓冲区似乎多余。
第四,您似乎没有关闭任何 GroupBy
"windows"。这意味着对于这些嵌套分组中的每一个,您都在创建独立的缓冲区。根据您的平台,每个 运行 在 Thread/Task 池中。因此,您在某种程度上将不受限制且未知的并发级别释放到您的程序中。因此,由于这些新组中的每一个都是使用 _type
和 _key
的新组合创建的,因此您正在创建永远不会 stop/dispose/cleanup 并将继续消耗资源的新缓冲区接收器。
第五,我们不知道您的内存问题是否只是因为没有足够的内存压力来强制执行 GC,因此您看到内存压力攀升。
我认为您的查询可以简化为:
from item in items
group item by new { item.Type, item.Key} into grp
from buffer in grp.Buffer(intervalTime, intervalSize, scheduler)
where buffer.Any()
select new LogCollection(grp.Key.Type, grp.Key.Key, buffer);
为了解决内存压力问题,我强烈建议您提供一些使组过期的方法。即使这很简单,只是在一段时间后终止您的订阅,然后立即重新订阅(Retry
和 Publish
可能会有所帮助)。否则,如果您获得只出现一次的 type/key 对,您将支付一组的价格,并因此支付整个订阅生命周期的缓冲区。
最后,在查看内存压力问题时,我建议您实际捕获或分析您的应用程序,而不是浏览任务管理器,它可以向您发送大量虚假信息。尝试 GC.GetTotalMemory(true)
或一些 WMI 挂钩,甚至只是跟踪 GC.CollectionCount
值。
我有一个关于 Rx Buffer
运算符的奇怪问题,我找不到合适的解决方案,而且我不知道我做错了什么。如果第 9 行的 Buffer
没有使用 EventLoopScheduler
它会在一段时间后开始泄漏内存,因为没有从 (item
)?
第 1 行的 item
是一个 IObservable<Entity>
,它将从下游的 TCP 套接字中检索到的解析数据推送。使 Buffer
使用 EventLoopScheduler
可以解决问题,但会降低整体系统性能。
如何在不必强制 Buffer
运算符使用 EventLoopScheduler
的情况下解决此内存泄漏问题?
var groupedItems = items
.GroupBy(entity => entity._type)
.Select(o => new {Type = o.Key, Categories = o.GroupBy(entity => entity._key)});
var ev = new EventLoopScheduler();
var collections = from item in groupedItems
from category in item.Categories
from entities in category.Buffer(intervalTime, intervalSize, /* ev */)
where entities.Any()
select new LogCollection(item.Type, category.Key, entities);
collections.Buffer(TimeSpan.FromSeconds(1)).Where(o => o.Any()).Subscribe(Insert);
更新:
经过一些调查,Buffer
运算符似乎不是问题,而是 "solves" 在 EventLoopScheduler
上安排时的问题。在绝望中,我发布了关键代码片段,因为我对 Rx 还很陌生,我不知道我是否正确使用了范式——所以如果我误用了它,请纠正我! :)
背景知识:应用程序通过 TCP 套接字检索二进制数据,并在进行一些转换后将其插入数据库。
收到
客户端可以连接到服务器,客户端发送的数据将被转换。如果约定发生任何异常,它将捕获异常并断开客户端。
public IObservable<LogEntity> StartListening(IDataConverter converter)
{
return Observable.Create<LogEntity>(observer =>
{
return _endPoint.ToListenerObservable(_backlog).Subscribe(client =>
{
var stream = client.ToClientObservable(_bufferSize, _waitHandle);
converter.Convert(stream)
.Catch<LogEntity, Exception>(exception =>
{
client.Close(); // dc client
return Observable.Empty<LogEntity>();
})
.Subscribe(observer.OnNext);
});
});
}
下面是负责读取发送到服务器的数据的代码。 WaitHandle
是 EventWaitHandle
的包装器,如果数据库离线,将被阻塞以避免数据在系统中累积。 (当 WaitHandle
阻塞且未检索任何数据时出现问题)
public static IObservable<ArraySegment<byte>> ToClientObservable(this TcpClient client, int size, WaitHandle waitHandle)
{
return client.GetStream().ToStreamObservable(size, waitHandle);
}
public static IObservable<ArraySegment<byte>> ToStreamObservable(this Stream stream, int size, WaitHandle waitHandle)
{
return Observable.Create<ArraySegment<byte>>(async (observer, token) =>
{
var buffer = new byte[size];
try
{
while (!token.IsCancellationRequested)
{
waitHandle.BlockingWait();
var received = await stream.ReadAsync(buffer, 0, size, token);
if (received == 0) break;
observer.OnNext(new ArraySegment<byte>(buffer, 0, received));
}
observer.OnCompleted();
}
catch (Exception error)
{
observer.OnError(error);
}
});
}
转换器
转换器使用 Scan
运算符来解析数据流。它内部可能会发生异常。目前,异常将传播到 StartListing
方法,发送错误数据的客户端将断开连接。
public IObservable<LogMessage> Convert(IObservable<ArraySegment<byte>> bytes)
{
return bytes.Scan(
new
{
Leftovers = new byte[0],
Logs = new List<LogMessage>(),
},
(saved, current) =>
{
// Parse bytes
// Exception here if invalid data retrieved
return new
{
Leftovers = data.ToArray(),
Logs = logs,
};
})
.SelectMany(o => o.Logs);
}
你们能看到任何可能导致内存泄漏的东西吗?这基本上是所有负责检索数据、在将其发送到转换阶段之前对其进行转换的代码(第一个问题)。此外,我使用 dotMemory 工具确认了内存泄漏。
您的示例代码中有一些值得注意的地方。
首先,它不是@Enigmativity 指出的 MVCE,例如items
是什么类型、它的值、它们的属性/(字段?),以及 LogCollection
.
其次,您似乎 运行 进行了过多的 GroupBy
操作。这创建了可观察序列的 3 层嵌套。我认为你只想 GroupBy
一次并依靠匿名打字为你做正确的事情,即 .GroupBy(entity => new { entity.Type, entity.Key})
。我这样说是因为一旦你按两次分组,你似乎只是再次打开它。
第三,你缓冲两次。两次检查空缓冲区。一次使用调度程序(也许)而另一个不使用?第二个缓冲区似乎多余。
第四,您似乎没有关闭任何 GroupBy
"windows"。这意味着对于这些嵌套分组中的每一个,您都在创建独立的缓冲区。根据您的平台,每个 运行 在 Thread/Task 池中。因此,您在某种程度上将不受限制且未知的并发级别释放到您的程序中。因此,由于这些新组中的每一个都是使用 _type
和 _key
的新组合创建的,因此您正在创建永远不会 stop/dispose/cleanup 并将继续消耗资源的新缓冲区接收器。
第五,我们不知道您的内存问题是否只是因为没有足够的内存压力来强制执行 GC,因此您看到内存压力攀升。
我认为您的查询可以简化为:
from item in items
group item by new { item.Type, item.Key} into grp
from buffer in grp.Buffer(intervalTime, intervalSize, scheduler)
where buffer.Any()
select new LogCollection(grp.Key.Type, grp.Key.Key, buffer);
为了解决内存压力问题,我强烈建议您提供一些使组过期的方法。即使这很简单,只是在一段时间后终止您的订阅,然后立即重新订阅(Retry
和 Publish
可能会有所帮助)。否则,如果您获得只出现一次的 type/key 对,您将支付一组的价格,并因此支付整个订阅生命周期的缓冲区。
最后,在查看内存压力问题时,我建议您实际捕获或分析您的应用程序,而不是浏览任务管理器,它可以向您发送大量虚假信息。尝试 GC.GetTotalMemory(true)
或一些 WMI 挂钩,甚至只是跟踪 GC.CollectionCount
值。