如何使用 Reactive 限制消费顺序?
How to limit consuming sequence with Reactive?
我们有一个应用程序,其中我们有一个物化的项目数组,我们将通过 Reactive 管道处理这些项目。有点像这样
EventLoopScheduler eventLoop = new EventLoopScheduler();
IScheduler concurrency = new TaskPoolScheduler(
new TaskFactory(
new LimitedConcurrencyLevelTaskScheduler(threadCount)));
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);
// 1. transform on single thread
IConnectableObservable<byte[]> source =
numbers.Select(Transform).ToObservable(eventLoop).Publish();
// 2. naive parallelization, restricts parallelization to Work
// only; chunk up sequence into smaller sequences and process
// in parallel, merging results
IObservable<int> final = source.
Buffer(10).
Select(
batch =>
batch.
ToObservable(concurrency).
Buffer(10).
Select(
concurrentBatch =>
concurrentBatch.
Select(Work).
ToArray().
ToObservable(eventLoop)).
Merge()).
Merge();
final.Subscribe();
source.Connect();
Await(final).Wait();
如果你真的很想玩这个,替代方法看起来像
private async static Task Await(IObservable<int> final)
{
await final.LastOrDefaultAsync();
}
private static byte[] Transform(int number)
{
if (number == itemCount)
{
Console.WriteLine("numbers exhausted.");
}
byte[] buffer = new byte[1000000];
Buffer.BlockCopy(bloat, 0, buffer, 0, bloat.Length);
return buffer;
}
private static int Work(byte[] buffer)
{
Console.WriteLine("t {0}.", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(50);
return 1;
}
一点解释。 Range(1, itemCount)
模拟从数据源具体化的原始输入。 Transform
模拟每个输入必须经历的浓缩过程,并导致更大的内存占用。 Work
是对转换后的输入进行操作的 "lengthy" 过程。
理想情况下,我们希望最大限度地减少系统同时持有的已转换输入的数量,同时通过并行化 Work
来最大化吞吐量。内存中转换输入的数量应该是批量大小(10
以上)乘以并发工作线程(threadCount
)。
所以对于 5 个线程,我们应该在任何给定时间保留 50 个 Transform
项;如果在这里,转换是一个 1MB 字节的缓冲区,那么我们预计整个 运行.
的内存消耗约为 50MB
我发现的很不一样。也就是说,Reactive 正在急切地预先消耗所有 numbers
和 Transform
它们(如 numbers exhausted.
消息所证明),导致预先出现大量内存峰值(@1GB for 1000 itemCount
).
我的基本问题是:有没有办法实现我需要的(即最小化消耗,通过多线程批处理节流)?
更新: 对不起詹姆斯的逆转;起初,我不认为 paulpdaniels 和 Enigmativity 的组合 Work(Transform)
应用(这与我们实际实现的性质有关,这比上面提供的简单场景更复杂),但是,经过一些进一步的实验,我也许可以应用相同的原则:即推迟转换直到批处理执行。
您想限制您正在做的工作量这一事实表明您应该拉取 数据,而不是将其推送给您。我会忘记在这种情况下使用 Rx,因为从根本上说,您所描述的不是反应式应用程序。此外,Rx 最适合串行处理项目;它使用顺序事件流。
为什么不让您的数据源可枚举,并使用 PLinq, Parallel.ForEach or DataFlow?所有这些听起来更适合您的问题。
正如@JamesWorld 所说,您很可能想使用 PLinq 来执行此任务,这实际上取决于您是否真的 对真实场景中的数据做出反应 或者只是遍历它。
如果您选择响应式路线,您可以使用 Merge
来控制发生的并行化级别:
var source = numbers
.Select(n =>
Observable.Defer(() => Observable.Start(() => Work(Transform(n)), concurrency)))
//Maximum concurrency
.Merge(10)
//Schedule all the output back onto the event loop scheduler
.ObserveOn(eventLoop);
上面的代码将首先使用所有数字(很抱歉无法避免这种情况),但是,通过将处理包装在 Defer
中并在其后跟一个限制并行化的 Merge
, 一次只能飞行 x
件物品。 Start()
将调度程序作为第二个参数,用于执行提供的方法。最后,由于您基本上只是将 Transform
的值推入 Work
,所以我在 Start
方法中组合了它们。
作为旁注,您可以 await
和 Observable
它将等同于您拥有的代码,即:
await source; //== await source.LastAsync();
您在代码中犯了几个错误,这些错误使您得出的所有结论都不成立。
首先,您已经做到了:
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);
您使用了 Enumerable.Range
,这意味着当您调用 numbers.Select(Transform)
时,您将以单个线程可以承受的速度消耗所有 numbers
。 Rx 甚至没有机会做任何工作,因为到目前为止你的管道是完全可枚举的。
下一期在您的订阅中:
final.Subscribe();
source.Connect();
Await(final).Wait();
因为您调用了 final.Subscribe()
和 Await(final).Wait();
,所以您正在为 final
可观察对象创建两个单独的订阅。
由于中间有一个 source.Connect()
,第二个订阅可能会丢失值。
所以,让我们尝试删除这里发生的所有问题,看看我们是否可以解决问题。
如果你这样做:
IObservable<int> final =
Observable
.Range(1, itemCount)
.Select(n => Transform(n))
.Select(bs => Work(bs));
一切顺利。数字在最后就用完了,在我的机器上处理 20 个项目大约需要 1 秒。
但这是按顺序处理所有内容。 Work
步骤为 Transform
提供了背压,以减慢它消耗数字的速度。
让我们添加并发。
IObservable<int> final =
Observable
.Range(1, itemCount)
.Select(n => Transform(n))
.SelectMany(bs => Observable.Start(() => Work(bs)));
这在 0.284 秒内处理了 20 个项目,处理完 5 个项目后,数字会自行耗尽。数字不再有任何背压。基本上,调度程序将所有工作交给 Observable.Start
,因此它可以立即为下一个数字做好准备。
让我们降低并发度。
IObservable<int> final =
Observable
.Range(1, itemCount)
.Select(n => Transform(n))
.SelectMany(bs => Observable.Start(() => Work(bs), concurrency));
现在 20 个项目在 0.5 秒内得到处理。在号码用完之前,只有两个得到处理。这是有道理的,因为我们将并发限制为两个线程。但是仍然没有对数字消耗的背压,所以它们很快就被消化掉了。
说了这么多,我试图构造一个具有适当背压的查询,但我找不到办法。关键在于 Transform(...)
比 Work(...)
执行得快得多,因此它完成得更快。
那么对我来说显而易见的举动是:
IObservable<int> final =
Observable
.Range(1, itemCount)
.SelectMany(n => Observable.Start(() => Work(Transform(n)), concurrency));
这直到最后才完成数字,并将处理限制为两个线程。它似乎为你想要的做了正确的事情,除了我必须一起做 Work(Transform(...))
。
我们有一个应用程序,其中我们有一个物化的项目数组,我们将通过 Reactive 管道处理这些项目。有点像这样
EventLoopScheduler eventLoop = new EventLoopScheduler();
IScheduler concurrency = new TaskPoolScheduler(
new TaskFactory(
new LimitedConcurrencyLevelTaskScheduler(threadCount)));
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);
// 1. transform on single thread
IConnectableObservable<byte[]> source =
numbers.Select(Transform).ToObservable(eventLoop).Publish();
// 2. naive parallelization, restricts parallelization to Work
// only; chunk up sequence into smaller sequences and process
// in parallel, merging results
IObservable<int> final = source.
Buffer(10).
Select(
batch =>
batch.
ToObservable(concurrency).
Buffer(10).
Select(
concurrentBatch =>
concurrentBatch.
Select(Work).
ToArray().
ToObservable(eventLoop)).
Merge()).
Merge();
final.Subscribe();
source.Connect();
Await(final).Wait();
如果你真的很想玩这个,替代方法看起来像
private async static Task Await(IObservable<int> final)
{
await final.LastOrDefaultAsync();
}
private static byte[] Transform(int number)
{
if (number == itemCount)
{
Console.WriteLine("numbers exhausted.");
}
byte[] buffer = new byte[1000000];
Buffer.BlockCopy(bloat, 0, buffer, 0, bloat.Length);
return buffer;
}
private static int Work(byte[] buffer)
{
Console.WriteLine("t {0}.", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(50);
return 1;
}
一点解释。 Range(1, itemCount)
模拟从数据源具体化的原始输入。 Transform
模拟每个输入必须经历的浓缩过程,并导致更大的内存占用。 Work
是对转换后的输入进行操作的 "lengthy" 过程。
理想情况下,我们希望最大限度地减少系统同时持有的已转换输入的数量,同时通过并行化 Work
来最大化吞吐量。内存中转换输入的数量应该是批量大小(10
以上)乘以并发工作线程(threadCount
)。
所以对于 5 个线程,我们应该在任何给定时间保留 50 个 Transform
项;如果在这里,转换是一个 1MB 字节的缓冲区,那么我们预计整个 运行.
我发现的很不一样。也就是说,Reactive 正在急切地预先消耗所有 numbers
和 Transform
它们(如 numbers exhausted.
消息所证明),导致预先出现大量内存峰值(@1GB for 1000 itemCount
).
我的基本问题是:有没有办法实现我需要的(即最小化消耗,通过多线程批处理节流)?
更新: 对不起詹姆斯的逆转;起初,我不认为 paulpdaniels 和 Enigmativity 的组合 Work(Transform)
应用(这与我们实际实现的性质有关,这比上面提供的简单场景更复杂),但是,经过一些进一步的实验,我也许可以应用相同的原则:即推迟转换直到批处理执行。
您想限制您正在做的工作量这一事实表明您应该拉取 数据,而不是将其推送给您。我会忘记在这种情况下使用 Rx,因为从根本上说,您所描述的不是反应式应用程序。此外,Rx 最适合串行处理项目;它使用顺序事件流。
为什么不让您的数据源可枚举,并使用 PLinq, Parallel.ForEach or DataFlow?所有这些听起来更适合您的问题。
正如@JamesWorld 所说,您很可能想使用 PLinq 来执行此任务,这实际上取决于您是否真的 对真实场景中的数据做出反应 或者只是遍历它。
如果您选择响应式路线,您可以使用 Merge
来控制发生的并行化级别:
var source = numbers
.Select(n =>
Observable.Defer(() => Observable.Start(() => Work(Transform(n)), concurrency)))
//Maximum concurrency
.Merge(10)
//Schedule all the output back onto the event loop scheduler
.ObserveOn(eventLoop);
上面的代码将首先使用所有数字(很抱歉无法避免这种情况),但是,通过将处理包装在 Defer
中并在其后跟一个限制并行化的 Merge
, 一次只能飞行 x
件物品。 Start()
将调度程序作为第二个参数,用于执行提供的方法。最后,由于您基本上只是将 Transform
的值推入 Work
,所以我在 Start
方法中组合了它们。
作为旁注,您可以 await
和 Observable
它将等同于您拥有的代码,即:
await source; //== await source.LastAsync();
您在代码中犯了几个错误,这些错误使您得出的所有结论都不成立。
首先,您已经做到了:
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);
您使用了 Enumerable.Range
,这意味着当您调用 numbers.Select(Transform)
时,您将以单个线程可以承受的速度消耗所有 numbers
。 Rx 甚至没有机会做任何工作,因为到目前为止你的管道是完全可枚举的。
下一期在您的订阅中:
final.Subscribe();
source.Connect();
Await(final).Wait();
因为您调用了 final.Subscribe()
和 Await(final).Wait();
,所以您正在为 final
可观察对象创建两个单独的订阅。
由于中间有一个 source.Connect()
,第二个订阅可能会丢失值。
所以,让我们尝试删除这里发生的所有问题,看看我们是否可以解决问题。
如果你这样做:
IObservable<int> final =
Observable
.Range(1, itemCount)
.Select(n => Transform(n))
.Select(bs => Work(bs));
一切顺利。数字在最后就用完了,在我的机器上处理 20 个项目大约需要 1 秒。
但这是按顺序处理所有内容。 Work
步骤为 Transform
提供了背压,以减慢它消耗数字的速度。
让我们添加并发。
IObservable<int> final =
Observable
.Range(1, itemCount)
.Select(n => Transform(n))
.SelectMany(bs => Observable.Start(() => Work(bs)));
这在 0.284 秒内处理了 20 个项目,处理完 5 个项目后,数字会自行耗尽。数字不再有任何背压。基本上,调度程序将所有工作交给 Observable.Start
,因此它可以立即为下一个数字做好准备。
让我们降低并发度。
IObservable<int> final =
Observable
.Range(1, itemCount)
.Select(n => Transform(n))
.SelectMany(bs => Observable.Start(() => Work(bs), concurrency));
现在 20 个项目在 0.5 秒内得到处理。在号码用完之前,只有两个得到处理。这是有道理的,因为我们将并发限制为两个线程。但是仍然没有对数字消耗的背压,所以它们很快就被消化掉了。
说了这么多,我试图构造一个具有适当背压的查询,但我找不到办法。关键在于 Transform(...)
比 Work(...)
执行得快得多,因此它完成得更快。
那么对我来说显而易见的举动是:
IObservable<int> final =
Observable
.Range(1, itemCount)
.SelectMany(n => Observable.Start(() => Work(Transform(n)), concurrency));
这直到最后才完成数字,并将处理限制为两个线程。它似乎为你想要的做了正确的事情,除了我必须一起做 Work(Transform(...))
。