具有大对象的反应式扩展 SelectMany

Reactive Extensions SelectMany with large objects

我有一小段代码可以模拟使用大对象(那么大 byte[])的流程。对于序列中的每个项目,调用异步方法以获得一些结果。问题?实际上,它抛出 OutOfMemoryException

兼容LINQPad的代码(C#程序):

void Main()
{
    var selectMany = Enumerable.Range(1, 100)
                   .Select(i => new LargeObject(i))
                   .ToObservable()
                   .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));

    selectMany
        .Subscribe(r => Console.WriteLine(r));
}


private static async Task<int> DoSomethingAsync(LargeObject lo)
{
    await Task.Delay(10000);
    return lo.Id;
}

internal class LargeObject
{
    public int Id { get; }

    public LargeObject(int id)
    {
        this.Id = id;
    }

    public byte[] Data { get; } = new byte[10000000];
}

似乎它同时创建了所有对象。我怎样才能以正确的方式做到这一点?

基本思想是调用 DoSomethingAsync 以获得每个对象的一些结果,所以这就是我使用 SelectMany 的原因。为了简单起见,我只介绍了一个Task.Delay,但在现实生活中它是一个可以并发处理一些项目的服务,所以我想引入一些并发机制来利用它。

请注意,理论上,一次处理少量项目不应填满内存。事实上,我们只需要每个 "large object" 就可以得到 DoSomethingAsync 方法的结果。在那之后,大对象就不再被使用了。

您可以这样引入时间间隔延迟:

var source = Enumerable.Range(1, 100)
   .ToObservable()
   .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (i, ts) => i)
   .Select(i => new LargeObject(i))
   .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));

因此,不是一次提取所有 100 个整数,而是立即将它们转换为 LargeObject,然后对所有 100 个调用 DoSomethingAsync,而是将整数一个一个地滴出,间隔一秒每个。


这就是 TPL+Rx 解决方案的样子。不用说,它不如单独的 Rx 或单独的 TPL 优雅。但是,我认为这个问题不太适合 Rx:

void Main()
{
    var source = Observable.Range(1, 100);

    const int MaxParallelism = 5;
    var transformBlock = new TransformBlock<int, int>(async i => await DoSomethingAsync(new LargeObject(i)),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxParallelism });
    source.Subscribe(transformBlock.AsObserver());
    var selectMany = transformBlock.AsObservable();

    selectMany
        .Subscribe(r => Console.WriteLine(r));
}

It seems that it creates all the objects at the same time.

是的,因为您是一次创建它们。

如果我简化您的代码,我可以告诉您原因:

void Main()
{
    var selectMany =
        Enumerable
            .Range(1, 5)
            .Do(x => Console.WriteLine($"{x}!"))
            .ToObservable()
            .SelectMany(i => Observable.FromAsync(() => DoSomethingAsync(i)));

    selectMany
        .Subscribe(r => Console.WriteLine(r));
}

private static async Task<int> DoSomethingAsync(int i)
{
    await Task.Delay(1);
    return i;
}

运行 这会产生:

1!
2!
3!
4!
5!
4
3
5
2
1

由于 Observable.FromAsync,您允许源 运行 在任何结果 return 之前完成。换句话说,您正在快速构建所有大型对象,但处理它们的速度很慢。

你应该允许 Rx 运行 同步,但是在默认调度器上,这样你的主线程就不会被阻塞。然后代码将 运行 没有任何内存问题,您的程序将在主线程上保持响应。

这是代码:

var selectMany =
    Observable
        .Range(1, 100, Scheduler.Default)
        .Select(i => new LargeObject(i))
        .Select(o => DoSomethingAsync(o))
        .Select(t => t.Result);

(我已经有效地将 Enumerable.Range(1, 100).ToObservable() 替换为 Observable.Range(1, 100),因为这也有助于解决一些问题。)

我已经尝试测试其他选项,但到目前为止任何允许 DoSomethingAsync 到 运行 异步 运行 的东西都进入内存不足错误。

我觉得自己 。与你上一个问题和我上一个答案类似,你需要做的是限制并发创建的 bigObjects™ 的数量。

为此,您需要将对象的创建和处理结合起来,放在同一个线程池中。现在的问题是,我们使用异步方法来允许线程在我们的异步方法运行 的同时做其他事情。由于您的慢速网络调用是异步的,因此您的(快速)对象创建代码将继续过快地创建大型对象。

相反,我们可以使用 Rx 通过将对象创建与异步调用相结合并使用 .Merge(maxConcurrent) 来限制并发。运行ning 并发 Observable 的数量。

作为奖励,我们还可以设置执行查询的最短时间。只需 Zip,延迟时间最短。

static void Main()
{
    var selectMany = Enumerable.Range(1, 100)
                        .ToObservable()
                        .Select(i => Observable.Defer(() => Observable.Return(new LargeObject(i)))
                            .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)))
                            .Zip(Observable.Timer(TimeSpan.FromMilliseconds(400)), (el, _) => el)
                        ).Merge(4);

    selectMany
        .Subscribe(r => Console.WriteLine(r));

    Console.ReadLine();
}


private static async Task<int> DoSomethingAsync(LargeObject lo)
{
    await Task.Delay(10000);
    return lo.Id;
}

internal class LargeObject
{
    public int Id { get; }

    public LargeObject(int id)
    {
        this.Id = id;
        Console.WriteLine(id + "!");
    }

    public byte[] Data { get; } = new byte[10000000];
}

ConcatMap 开箱即用地支持这一点。我知道此运算符在 .net 中不可用,但您可以使用 Concat 运算符进行相同操作,该运算符将订阅每个内部源推迟到前一个完成。