System.Reactive 中的并发订户执行
Concurrent subscriber execution in System.Reactive
我正在编写一个批处理管道,每 Y 秒处理 X 个未完成的操作。感觉 System.Reactive
很适合这个,但我无法让订阅者并行执行。我的代码如下所示:
var subject = new Subject<int>();
var concurrentCount = 0;
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Subscribe(list =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
Interlocked.Decrement(ref concurrentCount);
});
Parallel.For(0, 1_000_000, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();
是否有一种优雅的方式以并发方式读取此缓冲 Subject
?
Rx 订阅代码始终¹ 同步。您需要做的是从 Subscribe
委托中删除处理代码,并使其成为可观察序列的副作用。这是如何完成的:
Subject<int> subject = new();
int concurrentCount = 0;
var processor = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Select(list => Observable.Defer(() => Observable.Start(() =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
Interlocked.Decrement(ref concurrentCount);
})))
.Merge(maxConcurrent: 2)
.DefaultIfEmpty() // Prevents exception in corner case (empty source)
.ToTask(); // or RunAsync (either one starts the processor)
Parallel.For(0, 1_000_000, new() { MaxDegreeOfParallelism = 2 }, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();
processor.Wait();
Select
+Observable.Defer
+Observable.Start
组合将源序列转换为 IObservable<IObservable<Unit>>
。它是一个嵌套序列,每个内部序列代表一个 list
的处理。当 Observable.Start
的委托完成时,内部序列发出一个 Unit
值,然后完成。包装 Defer
运算符确保内部序列是“冷的”,以便它们在订阅之前不会启动。然后是 Merge
运算符,它将外部序列展开为平面 IObservable<Unit>
序列。 maxConcurrent
参数配置将同时订阅多少个内部序列。每次 Merge
运算符订阅内部序列时,相应的 Observable.Start
委托都会在 ThreadPool
线程上启动 运行ning。
如果将 maxConcurrent
设置得太高,ThreadPool
可能会 运行 out of workers(换句话说,它可能会变得饱和),并且
然后,您的代码的并发性将取决于 ThreadPool
可用性。如果您愿意,可以使用 ThreadPool.SetMinThreads
方法增加 ThreadPool
根据需要立即创建的工人数量。但是,如果您的工作负载受 CPU 限制,并且您将工作线程增加到 Environment.ProcessorCount
值以上,那么您的 CPU 很可能会饱和。
如果您的工作负载是异步的,您可以将 Observable.Defer
+Observable.Start
组合替换为 Observable.FromAsync
运算符,如图 .
¹ 一个 unpublished library exists, the AsyncRx.NET,玩弄异步订阅的想法。它基于新接口 IAsyncObservable<T>
和 IAsyncObserver<T>
.
你这样说:
// This never gets printed, because Subscribe is only ever called on a single thread.
这不是真的。没有打印任何内容的原因是 Subscribe
中的代码以锁定方式发生 - 一次只有一个线程在 Subscribe
中执行,因此您几乎立即递增该值然后递减它。而且由于它从零开始,所以它永远没有机会超过 1
.
现在这只是因为 Rx 合同。一次只有一个线程订阅。
我们可以解决这个问题。
试试这个代码:
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.SelectMany(list =>
Observable
.Start(() =>
{
var c = Interlocked.Increment(ref concurrentCount);
Console.WriteLine("Starting {0} simultaneous batches", c);
})
.Finally(() =>
{
var c = Interlocked.Decrement(ref concurrentCount);
Console.WriteLine("Ending {0} simultaneous batches", c);
}))
.Subscribe();
现在,当我 运行 它(少于您设置的 1_000_000
迭代次数)时,我得到这样的输出:
Starting 1 simultaneous batches
Starting 4 simultaneous batches
Ending 3 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Starting 3 simultaneous batches
Ending 1 simultaneous batches
Ending 2 simultaneous batches
Starting 4 simultaneous batches
Starting 5 simultaneous batches
Ending 3 simultaneous batches
Starting 2 simultaneous batches
Starting 2 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Ending 0 simultaneous batches
Ending 4 simultaneous batches
Ending 1 simultaneous batches
Starting 1 simultaneous batches
Starting 1 simultaneous batches
Ending 0 simultaneous batches
Ending 0 simultaneous batches
我正在编写一个批处理管道,每 Y 秒处理 X 个未完成的操作。感觉 System.Reactive
很适合这个,但我无法让订阅者并行执行。我的代码如下所示:
var subject = new Subject<int>();
var concurrentCount = 0;
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Subscribe(list =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
Interlocked.Decrement(ref concurrentCount);
});
Parallel.For(0, 1_000_000, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();
是否有一种优雅的方式以并发方式读取此缓冲 Subject
?
Rx 订阅代码始终¹ 同步。您需要做的是从 Subscribe
委托中删除处理代码,并使其成为可观察序列的副作用。这是如何完成的:
Subject<int> subject = new();
int concurrentCount = 0;
var processor = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Select(list => Observable.Defer(() => Observable.Start(() =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
Interlocked.Decrement(ref concurrentCount);
})))
.Merge(maxConcurrent: 2)
.DefaultIfEmpty() // Prevents exception in corner case (empty source)
.ToTask(); // or RunAsync (either one starts the processor)
Parallel.For(0, 1_000_000, new() { MaxDegreeOfParallelism = 2 }, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();
processor.Wait();
Select
+Observable.Defer
+Observable.Start
组合将源序列转换为 IObservable<IObservable<Unit>>
。它是一个嵌套序列,每个内部序列代表一个 list
的处理。当 Observable.Start
的委托完成时,内部序列发出一个 Unit
值,然后完成。包装 Defer
运算符确保内部序列是“冷的”,以便它们在订阅之前不会启动。然后是 Merge
运算符,它将外部序列展开为平面 IObservable<Unit>
序列。 maxConcurrent
参数配置将同时订阅多少个内部序列。每次 Merge
运算符订阅内部序列时,相应的 Observable.Start
委托都会在 ThreadPool
线程上启动 运行ning。
如果将 maxConcurrent
设置得太高,ThreadPool
可能会 运行 out of workers(换句话说,它可能会变得饱和),并且
然后,您的代码的并发性将取决于 ThreadPool
可用性。如果您愿意,可以使用 ThreadPool.SetMinThreads
方法增加 ThreadPool
根据需要立即创建的工人数量。但是,如果您的工作负载受 CPU 限制,并且您将工作线程增加到 Environment.ProcessorCount
值以上,那么您的 CPU 很可能会饱和。
如果您的工作负载是异步的,您可以将 Observable.Defer
+Observable.Start
组合替换为 Observable.FromAsync
运算符,如图
¹ 一个 unpublished library exists, the AsyncRx.NET,玩弄异步订阅的想法。它基于新接口 IAsyncObservable<T>
和 IAsyncObserver<T>
.
你这样说:
// This never gets printed, because Subscribe is only ever called on a single thread.
这不是真的。没有打印任何内容的原因是 Subscribe
中的代码以锁定方式发生 - 一次只有一个线程在 Subscribe
中执行,因此您几乎立即递增该值然后递减它。而且由于它从零开始,所以它永远没有机会超过 1
.
现在这只是因为 Rx 合同。一次只有一个线程订阅。
我们可以解决这个问题。
试试这个代码:
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.SelectMany(list =>
Observable
.Start(() =>
{
var c = Interlocked.Increment(ref concurrentCount);
Console.WriteLine("Starting {0} simultaneous batches", c);
})
.Finally(() =>
{
var c = Interlocked.Decrement(ref concurrentCount);
Console.WriteLine("Ending {0} simultaneous batches", c);
}))
.Subscribe();
现在,当我 运行 它(少于您设置的 1_000_000
迭代次数)时,我得到这样的输出:
Starting 1 simultaneous batches
Starting 4 simultaneous batches
Ending 3 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Starting 3 simultaneous batches
Ending 1 simultaneous batches
Ending 2 simultaneous batches
Starting 4 simultaneous batches
Starting 5 simultaneous batches
Ending 3 simultaneous batches
Starting 2 simultaneous batches
Starting 2 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Ending 0 simultaneous batches
Ending 4 simultaneous batches
Ending 1 simultaneous batches
Starting 1 simultaneous batches
Starting 1 simultaneous batches
Ending 0 simultaneous batches
Ending 0 simultaneous batches