System.Reactive 中的并发订户执行

Concurrent subscriber execution in System.Reactive

我正在编写一个批处理管道,每 Y 秒处理 X 个未完成的操作。感觉 System.Reactive 很适合这个,但我无法让订阅者并行执行。我的代码如下所示:

var subject = new Subject<int>();

var concurrentCount = 0;

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Subscribe(list => 
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
        Interlocked.Decrement(ref concurrentCount);
    });
    
Parallel.For(0, 1_000_000, i =>
{
    subject.OnNext(i);
 });
subject.OnCompleted();

是否有一种优雅的方式以并发方式读取此缓冲 Subject

Rx 订阅代码始终¹ 同步。您需要做的是从 Subscribe 委托中删除处理代码,并使其成为可观察序列的副作用。这是如何完成的:

Subject<int> subject = new();
int concurrentCount = 0;

var processor = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Select(list => Observable.Defer(() => Observable.Start(() =>
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
        Interlocked.Decrement(ref concurrentCount);
    })))
    .Merge(maxConcurrent: 2)
    .DefaultIfEmpty() // Prevents exception in corner case (empty source)
    .ToTask(); // or RunAsync (either one starts the processor)

Parallel.For(0, 1_000_000, new() { MaxDegreeOfParallelism = 2 }, i =>
{
    subject.OnNext(i);
});
subject.OnCompleted();

processor.Wait();

Select+Observable.Defer+Observable.Start 组合将源序列转换为 IObservable<IObservable<Unit>>。它是一个嵌套序列,每个内部序列代表一个 list 的处理。当 Observable.Start 的委托完成时,内部序列发出一个 Unit 值,然后完成。包装 Defer 运算符确保内部序列是“冷的”,以便它们在订阅之前不会启动。然后是 Merge 运算符,它将外部序列展开为平面 IObservable<Unit> 序列。 maxConcurrent 参数配置将同时订阅多少个内部序列。每次 Merge 运算符订阅内部序列时,相应的 Observable.Start 委托都会在 ThreadPool 线程上启动 运行ning。

如果将 maxConcurrent 设置得太高,ThreadPool 可能会 运行 out of workers(换句话说,它可能会变得饱和),并且 然后,您的代码的并发性将取决于 ThreadPool 可用性。如果您愿意,可以使用 ThreadPool.SetMinThreads 方法增加 ThreadPool 根据需要立即创建的工人数量。但是,如果您的工作负载受 CPU 限制,并且您将工作线程增加到 Environment.ProcessorCount 值以上,那么您的 CPU 很可能会饱和。

如果您的工作负载是异步的,您可以将 Observable.Defer+Observable.Start 组合替换为 Observable.FromAsync 运算符,如图 .

¹ 一个 unpublished library exists, the AsyncRx.NET,玩弄异步订阅的想法。它基于新接口 IAsyncObservable<T>IAsyncObserver<T>.

你这样说:

// This never gets printed, because Subscribe is only ever called on a single thread.

这不是真的。没有打印任何内容的原因是 Subscribe 中的代码以锁定方式发生 - 一次只有一个线程在 Subscribe 中执行,因此您几乎立即递增该值然后递减它。而且由于它从零开始,所以它永远没有机会超过 1.

现在这只是因为 Rx 合同。一次只有一个线程订阅。

我们可以解决这个问题。

试试这个代码:

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .SelectMany(list =>
        Observable
            .Start(() =>
            {
                var c = Interlocked.Increment(ref concurrentCount);
                Console.WriteLine("Starting {0} simultaneous batches", c);
            })
            .Finally(() =>
            {
                var c = Interlocked.Decrement(ref concurrentCount);
                Console.WriteLine("Ending {0} simultaneous batches", c);
            }))
    .Subscribe();

现在,当我 运行 它(少于您设置的 1_000_000 迭代次数)时,我得到这样的输出:

Starting 1 simultaneous batches
Starting 4 simultaneous batches
Ending 3 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Starting 3 simultaneous batches
Ending 1 simultaneous batches
Ending 2 simultaneous batches
Starting 4 simultaneous batches
Starting 5 simultaneous batches
Ending 3 simultaneous batches
Starting 2 simultaneous batches
Starting 2 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Ending 0 simultaneous batches
Ending 4 simultaneous batches
Ending 1 simultaneous batches
Starting 1 simultaneous batches
Starting 1 simultaneous batches
Ending 0 simultaneous batches
Ending 0 simultaneous batches