具有大对象的反应式扩展 SelectMany
Reactive Extensions SelectMany with large objects
我有一小段代码可以模拟使用大对象(那么大 byte[]
)的流程。对于序列中的每个项目,调用异步方法以获得一些结果。问题?实际上,它抛出 OutOfMemoryException
。
兼容LINQPad的代码(C#程序):
void Main()
{
var selectMany = Enumerable.Range(1, 100)
.Select(i => new LargeObject(i))
.ToObservable()
.SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));
selectMany
.Subscribe(r => Console.WriteLine(r));
}
private static async Task<int> DoSomethingAsync(LargeObject lo)
{
await Task.Delay(10000);
return lo.Id;
}
internal class LargeObject
{
public int Id { get; }
public LargeObject(int id)
{
this.Id = id;
}
public byte[] Data { get; } = new byte[10000000];
}
似乎它同时创建了所有对象。我怎样才能以正确的方式做到这一点?
基本思想是调用 DoSomethingAsync 以获得每个对象的一些结果,所以这就是我使用 SelectMany 的原因。为了简单起见,我只介绍了一个Task.Delay,但在现实生活中它是一个可以并发处理一些项目的服务,所以我想引入一些并发机制来利用它。
请注意,理论上,一次处理少量项目不应填满内存。事实上,我们只需要每个 "large object" 就可以得到 DoSomethingAsync 方法的结果。在那之后,大对象就不再被使用了。
您可以这样引入时间间隔延迟:
var source = Enumerable.Range(1, 100)
.ToObservable()
.Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (i, ts) => i)
.Select(i => new LargeObject(i))
.SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));
因此,不是一次提取所有 100 个整数,而是立即将它们转换为 LargeObject
,然后对所有 100 个调用 DoSomethingAsync
,而是将整数一个一个地滴出,间隔一秒每个。
这就是 TPL+Rx 解决方案的样子。不用说,它不如单独的 Rx 或单独的 TPL 优雅。但是,我认为这个问题不太适合 Rx:
void Main()
{
var source = Observable.Range(1, 100);
const int MaxParallelism = 5;
var transformBlock = new TransformBlock<int, int>(async i => await DoSomethingAsync(new LargeObject(i)),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxParallelism });
source.Subscribe(transformBlock.AsObserver());
var selectMany = transformBlock.AsObservable();
selectMany
.Subscribe(r => Console.WriteLine(r));
}
It seems that it creates all the objects at the same time.
是的,因为您是一次创建它们。
如果我简化您的代码,我可以告诉您原因:
void Main()
{
var selectMany =
Enumerable
.Range(1, 5)
.Do(x => Console.WriteLine($"{x}!"))
.ToObservable()
.SelectMany(i => Observable.FromAsync(() => DoSomethingAsync(i)));
selectMany
.Subscribe(r => Console.WriteLine(r));
}
private static async Task<int> DoSomethingAsync(int i)
{
await Task.Delay(1);
return i;
}
运行 这会产生:
1!
2!
3!
4!
5!
4
3
5
2
1
由于 Observable.FromAsync
,您允许源 运行 在任何结果 return 之前完成。换句话说,您正在快速构建所有大型对象,但处理它们的速度很慢。
你应该允许 Rx 运行 同步,但是在默认调度器上,这样你的主线程就不会被阻塞。然后代码将 运行 没有任何内存问题,您的程序将在主线程上保持响应。
这是代码:
var selectMany =
Observable
.Range(1, 100, Scheduler.Default)
.Select(i => new LargeObject(i))
.Select(o => DoSomethingAsync(o))
.Select(t => t.Result);
(我已经有效地将 Enumerable.Range(1, 100).ToObservable()
替换为 Observable.Range(1, 100)
,因为这也有助于解决一些问题。)
我已经尝试测试其他选项,但到目前为止任何允许 DoSomethingAsync
到 运行 异步 运行 的东西都进入内存不足错误。
我觉得自己 。与你上一个问题和我上一个答案类似,你需要做的是限制并发创建的 bigObjects™ 的数量。
为此,您需要将对象的创建和处理结合起来,放在同一个线程池中。现在的问题是,我们使用异步方法来允许线程在我们的异步方法运行 的同时做其他事情。由于您的慢速网络调用是异步的,因此您的(快速)对象创建代码将继续过快地创建大型对象。
相反,我们可以使用 Rx 通过将对象创建与异步调用相结合并使用 .Merge(maxConcurrent)
来限制并发。运行ning 并发 Observable 的数量。
作为奖励,我们还可以设置执行查询的最短时间。只需 Zip
,延迟时间最短。
static void Main()
{
var selectMany = Enumerable.Range(1, 100)
.ToObservable()
.Select(i => Observable.Defer(() => Observable.Return(new LargeObject(i)))
.SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)))
.Zip(Observable.Timer(TimeSpan.FromMilliseconds(400)), (el, _) => el)
).Merge(4);
selectMany
.Subscribe(r => Console.WriteLine(r));
Console.ReadLine();
}
private static async Task<int> DoSomethingAsync(LargeObject lo)
{
await Task.Delay(10000);
return lo.Id;
}
internal class LargeObject
{
public int Id { get; }
public LargeObject(int id)
{
this.Id = id;
Console.WriteLine(id + "!");
}
public byte[] Data { get; } = new byte[10000000];
}
ConcatMap 开箱即用地支持这一点。我知道此运算符在 .net 中不可用,但您可以使用 Concat 运算符进行相同操作,该运算符将订阅每个内部源推迟到前一个完成。
我有一小段代码可以模拟使用大对象(那么大 byte[]
)的流程。对于序列中的每个项目,调用异步方法以获得一些结果。问题?实际上,它抛出 OutOfMemoryException
。
兼容LINQPad的代码(C#程序):
void Main()
{
var selectMany = Enumerable.Range(1, 100)
.Select(i => new LargeObject(i))
.ToObservable()
.SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));
selectMany
.Subscribe(r => Console.WriteLine(r));
}
private static async Task<int> DoSomethingAsync(LargeObject lo)
{
await Task.Delay(10000);
return lo.Id;
}
internal class LargeObject
{
public int Id { get; }
public LargeObject(int id)
{
this.Id = id;
}
public byte[] Data { get; } = new byte[10000000];
}
似乎它同时创建了所有对象。我怎样才能以正确的方式做到这一点?
基本思想是调用 DoSomethingAsync 以获得每个对象的一些结果,所以这就是我使用 SelectMany 的原因。为了简单起见,我只介绍了一个Task.Delay,但在现实生活中它是一个可以并发处理一些项目的服务,所以我想引入一些并发机制来利用它。
请注意,理论上,一次处理少量项目不应填满内存。事实上,我们只需要每个 "large object" 就可以得到 DoSomethingAsync 方法的结果。在那之后,大对象就不再被使用了。
您可以这样引入时间间隔延迟:
var source = Enumerable.Range(1, 100)
.ToObservable()
.Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (i, ts) => i)
.Select(i => new LargeObject(i))
.SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));
因此,不是一次提取所有 100 个整数,而是立即将它们转换为 LargeObject
,然后对所有 100 个调用 DoSomethingAsync
,而是将整数一个一个地滴出,间隔一秒每个。
这就是 TPL+Rx 解决方案的样子。不用说,它不如单独的 Rx 或单独的 TPL 优雅。但是,我认为这个问题不太适合 Rx:
void Main()
{
var source = Observable.Range(1, 100);
const int MaxParallelism = 5;
var transformBlock = new TransformBlock<int, int>(async i => await DoSomethingAsync(new LargeObject(i)),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxParallelism });
source.Subscribe(transformBlock.AsObserver());
var selectMany = transformBlock.AsObservable();
selectMany
.Subscribe(r => Console.WriteLine(r));
}
It seems that it creates all the objects at the same time.
是的,因为您是一次创建它们。
如果我简化您的代码,我可以告诉您原因:
void Main()
{
var selectMany =
Enumerable
.Range(1, 5)
.Do(x => Console.WriteLine($"{x}!"))
.ToObservable()
.SelectMany(i => Observable.FromAsync(() => DoSomethingAsync(i)));
selectMany
.Subscribe(r => Console.WriteLine(r));
}
private static async Task<int> DoSomethingAsync(int i)
{
await Task.Delay(1);
return i;
}
运行 这会产生:
1! 2! 3! 4! 5! 4 3 5 2 1
由于 Observable.FromAsync
,您允许源 运行 在任何结果 return 之前完成。换句话说,您正在快速构建所有大型对象,但处理它们的速度很慢。
你应该允许 Rx 运行 同步,但是在默认调度器上,这样你的主线程就不会被阻塞。然后代码将 运行 没有任何内存问题,您的程序将在主线程上保持响应。
这是代码:
var selectMany =
Observable
.Range(1, 100, Scheduler.Default)
.Select(i => new LargeObject(i))
.Select(o => DoSomethingAsync(o))
.Select(t => t.Result);
(我已经有效地将 Enumerable.Range(1, 100).ToObservable()
替换为 Observable.Range(1, 100)
,因为这也有助于解决一些问题。)
我已经尝试测试其他选项,但到目前为止任何允许 DoSomethingAsync
到 运行 异步 运行 的东西都进入内存不足错误。
我觉得自己
为此,您需要将对象的创建和处理结合起来,放在同一个线程池中。现在的问题是,我们使用异步方法来允许线程在我们的异步方法运行 的同时做其他事情。由于您的慢速网络调用是异步的,因此您的(快速)对象创建代码将继续过快地创建大型对象。
相反,我们可以使用 Rx 通过将对象创建与异步调用相结合并使用 .Merge(maxConcurrent)
来限制并发。运行ning 并发 Observable 的数量。
作为奖励,我们还可以设置执行查询的最短时间。只需 Zip
,延迟时间最短。
static void Main()
{
var selectMany = Enumerable.Range(1, 100)
.ToObservable()
.Select(i => Observable.Defer(() => Observable.Return(new LargeObject(i)))
.SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)))
.Zip(Observable.Timer(TimeSpan.FromMilliseconds(400)), (el, _) => el)
).Merge(4);
selectMany
.Subscribe(r => Console.WriteLine(r));
Console.ReadLine();
}
private static async Task<int> DoSomethingAsync(LargeObject lo)
{
await Task.Delay(10000);
return lo.Id;
}
internal class LargeObject
{
public int Id { get; }
public LargeObject(int id)
{
this.Id = id;
Console.WriteLine(id + "!");
}
public byte[] Data { get; } = new byte[10000000];
}
ConcatMap 开箱即用地支持这一点。我知道此运算符在 .net 中不可用,但您可以使用 Concat 运算符进行相同操作,该运算符将订阅每个内部源推迟到前一个完成。