反应式管道——如何控制并行度?
Reactive pipeline - how to control parallelism?
我正在构建一个简单的处理管道,其中一个项目作为输入获取,它由多个处理器按顺序操作,最后输出。下图描述了整体架构:
它当前的工作方式:管道正在尽可能快地从提供程序中获取项目。一旦获取了一个项目,它就会被传递给处理器。处理项目后,将通知输出。虽然按顺序处理单个项目,但可以并行处理多个项目(取决于从提供程序获取它们的速度)。
创建并从管道返回的 IObservable
如下所示:
return Observable.Create<T>(async observer =>
{
while (_provider.HasNext)
{
T item = await _provider.GetNextAsync();
observer.OnNext(item);
}
}).SelectMany(item => Observable.FromAsync(() =>
_processors.Aggregate(
seed: Task.FromResult(item),
func: (current, processor) => current.ContinueWith( // Append continuations.
previous => processor.ProcessAsync(previous.Result))
.Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};
缺少的部分:我需要一个控制机制来控制在任何给定时间管道中可以有多少项目(最大).
例如,如果 最大并行处理数为 3,则将导致以下工作流:
- 项目 1 被提取并传递给处理器。
- 项目 2 被提取并传递给处理器。
- 项目 3 被提取并传递给处理器。
- 项目 1 已完成处理。
- 项目 4 被提取并传递给处理器。
- 项目 3 已完成处理。
- 项目 5 被提取并传递给处理器。
- 等等...
您可能需要重新排列您发布的代码,但这是一种方法:
var eventLoopScheduler = new EventLoopScheduler ();
(from semaphore in Observable.Return(new Semaphore(2,2))
from input in GetInputObs()
from getAccess in Observable.Start(() => semaphore.WaitOne(),eventLoopScheduler)
from output in ProcessInputOnPipeline(input)
.SubscribeOn(Scheduler.Default)
.Finally(() => semaphore.Release())
select output)
.Subscribe(x => Console.WriteLine(x), ex => {});
我已将您的管道建模为 1 个 Observable(实际上它由几个链接在一起的较小的 observable 组成)
关键是要确保无论管道如何终止都释放信号量 (Empty/Error),否则流可能会挂起,因此使用 Finally() 在信号量上调用 Release() . (如果它永远不会 OnComplete()/OnError() 可能值得考虑在可观察的管道上添加超时。
编辑:
根据以下评论,我在信号量访问周围添加了一些调度,这样我们就不会阻止将这些输入推送到我们的流中的任何人。我使用了 EventLoopScheduler,以便所有信号量访问请求都将排队并在 1 个线程上执行。
编辑:虽然我更喜欢 Paul 的回答 - 简单、更少的调度、更少的同步(合并在内部使用队列)。
Merge
提供了一个需要 max concurrency 的重载。
它的签名看起来像:IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);
这是你的例子的样子(我也重构了一些其他代码,你可以接受或离开):
return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext,
Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
return _processers.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync));
}))
//Only allow 3 streams to be execute in parallel.
.Merge(3);
要分解它的作用,
While
将检查每次迭代,如果 _provider.HasNext
为真,
如果是这样,那么它将重新订阅以获得下一个值
_provider
,否则它会发出 onCompleted
- 在 select 内部创建了一个新的可观察流,但尚未使用
Defer
进行评估
- 返回的
IObservable<IObservable<T>>
被传递给 Merge
,后者最多同时订阅 3 个 observable。
- 内部可观察对象最终在订阅时进行评估。
选项 1
如果您还需要控制并行请求的数量,您需要更巧妙一些,因为您需要发出信号表明您的 Observable
已准备好接受新值:
return Observable.Create<T>(observer =>
{
var subject = new Subject<Unit>();
var disposable = new CompositeDisposable(subject);
disposable.Add(subject
//This will complete when provider has run out of values
.TakeWhile(_ => _provider.HasNext)
.SelectMany(
_ => _provider.GetNextAsync(),
(_, item) =>
{
return _processors
.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync))
//Could also use `Finally` here, this signals the chain
//to start on the next item.
.Do(dontCare => {}, () => subject.OnNext(Unit.Default));
}
)
.Merge(3)
.Subscribe(observer));
//Queue up 3 requests for the initial kickoff
disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));
return disposable;
});
我正在构建一个简单的处理管道,其中一个项目作为输入获取,它由多个处理器按顺序操作,最后输出。下图描述了整体架构:
它当前的工作方式:管道正在尽可能快地从提供程序中获取项目。一旦获取了一个项目,它就会被传递给处理器。处理项目后,将通知输出。虽然按顺序处理单个项目,但可以并行处理多个项目(取决于从提供程序获取它们的速度)。
创建并从管道返回的 IObservable
如下所示:
return Observable.Create<T>(async observer =>
{
while (_provider.HasNext)
{
T item = await _provider.GetNextAsync();
observer.OnNext(item);
}
}).SelectMany(item => Observable.FromAsync(() =>
_processors.Aggregate(
seed: Task.FromResult(item),
func: (current, processor) => current.ContinueWith( // Append continuations.
previous => processor.ProcessAsync(previous.Result))
.Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};
缺少的部分:我需要一个控制机制来控制在任何给定时间管道中可以有多少项目(最大).
例如,如果 最大并行处理数为 3,则将导致以下工作流:
- 项目 1 被提取并传递给处理器。
- 项目 2 被提取并传递给处理器。
- 项目 3 被提取并传递给处理器。
- 项目 1 已完成处理。
- 项目 4 被提取并传递给处理器。
- 项目 3 已完成处理。
- 项目 5 被提取并传递给处理器。
- 等等...
您可能需要重新排列您发布的代码,但这是一种方法:
var eventLoopScheduler = new EventLoopScheduler ();
(from semaphore in Observable.Return(new Semaphore(2,2))
from input in GetInputObs()
from getAccess in Observable.Start(() => semaphore.WaitOne(),eventLoopScheduler)
from output in ProcessInputOnPipeline(input)
.SubscribeOn(Scheduler.Default)
.Finally(() => semaphore.Release())
select output)
.Subscribe(x => Console.WriteLine(x), ex => {});
我已将您的管道建模为 1 个 Observable(实际上它由几个链接在一起的较小的 observable 组成)
关键是要确保无论管道如何终止都释放信号量 (Empty/Error),否则流可能会挂起,因此使用 Finally() 在信号量上调用 Release() . (如果它永远不会 OnComplete()/OnError() 可能值得考虑在可观察的管道上添加超时。
编辑:
根据以下评论,我在信号量访问周围添加了一些调度,这样我们就不会阻止将这些输入推送到我们的流中的任何人。我使用了 EventLoopScheduler,以便所有信号量访问请求都将排队并在 1 个线程上执行。
编辑:虽然我更喜欢 Paul 的回答 - 简单、更少的调度、更少的同步(合并在内部使用队列)。
Merge
提供了一个需要 max concurrency 的重载。
它的签名看起来像:IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);
这是你的例子的样子(我也重构了一些其他代码,你可以接受或离开):
return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext,
Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
return _processers.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync));
}))
//Only allow 3 streams to be execute in parallel.
.Merge(3);
要分解它的作用,
While
将检查每次迭代,如果_provider.HasNext
为真, 如果是这样,那么它将重新订阅以获得下一个值_provider
,否则它会发出onCompleted
- 在 select 内部创建了一个新的可观察流,但尚未使用
Defer
进行评估
- 返回的
IObservable<IObservable<T>>
被传递给Merge
,后者最多同时订阅 3 个 observable。 - 内部可观察对象最终在订阅时进行评估。
选项 1
如果您还需要控制并行请求的数量,您需要更巧妙一些,因为您需要发出信号表明您的 Observable
已准备好接受新值:
return Observable.Create<T>(observer =>
{
var subject = new Subject<Unit>();
var disposable = new CompositeDisposable(subject);
disposable.Add(subject
//This will complete when provider has run out of values
.TakeWhile(_ => _provider.HasNext)
.SelectMany(
_ => _provider.GetNextAsync(),
(_, item) =>
{
return _processors
.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync))
//Could also use `Finally` here, this signals the chain
//to start on the next item.
.Do(dontCare => {}, () => subject.OnNext(Unit.Default));
}
)
.Merge(3)
.Subscribe(observer));
//Queue up 3 requests for the initial kickoff
disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));
return disposable;
});