如何等待完成的 IObserver 调用,包括观察订阅者调用?

How to await finished IObserver call including observing subscriber calls?

当调用 onNext 并且订阅者在另一个 thread/task 上时如何等待订阅者方法完成?我想念像 await onNext() 这样的东西,例如对于主题...

例如,当 运行 在同一线程上时:

    [Test, Explicit]
    public void TestSyncOnSameTask()
    {
        Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        using (var subject = new Subject<int>())
        using (subject
            .Subscribe(
                o => Console.WriteLine("Received {1} on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId,
                    o),
                () => Console.WriteLine("OnCompleted on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId)))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
        }
    }

将打印输出:

Starting on threadId:10
Received 3 on threadId:10
Received 2 on threadId:10
Received 1 on threadId:10
OnCompleted on threadId:10

当 运行 在单独的线程上时(例如卸载 UI-线程)

    [Test, Explicit]
    public void TestObserveOnSeparateTask()
    {
        Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        using (var subject = new Subject<int>())
        using (subject.ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(
                o => { Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                         Task.Delay(1000 * o);
                },
                () => Console.WriteLine("OnCompleted on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId)))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
        }
    }

将出现以下输出结果:

Starting on threadId:10
Received 3 on threadId:11

如您所见...留下 using 块导致主题立即调用 OnNext() returns 并且不等待订阅者任务完成。

如何"await"用户呼叫?订单有保障吗?我没有找到有关 Synchronize() 方法以及此方法真正同步的内容的良好文档。

我在寻找类似的东西:

await subject.OnNext(3);
await subject.OnNext(2);

在 "old school way" 中,我使用了 BlockingCollection 作为队列并且

    public async Task<IResponse> Request(IRequest request)
    {
        var taskCompletionSource = new TaskCompletionSource<IResponse>();
        Task<IResponse> tsk = taskCompletionSource.Task;
        Action<IResponse> responseHandler = taskCompletionSource.SetResult; 
        this.requestProcessor.Process(request, responseHandler);
        return await tsk.ConfigureAwait(false);
    }

请求处理器类似于:

        while (true)
        {
            //blockingCollection is shared between caller and runner
            blockingCollection.TryTake(out request, Timeout.Infinite);
            // call callback when finished to return to awaited method
        }

现在我想改用 RX/Observables,但是这个卸载主题让我印象深刻...更多关于 publish/subscribe 和 Reactive Extensions 但我想我需要像 request/response 这样的东西.有什么方法可以用 RX 做到这一点吗?

要完成你想做的事情,你可以这样做:

Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var are = new AutoResetEvent(false);
using (var subject = new Subject<int>())
{
    using (
        subject
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                Task.Delay(1000 * o);
            }, () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
            are.WaitOne();
        }
}
Console.WriteLine("Done.");

为什么要屏蔽? 我的猜测是您这样做是因为您认为您需要这样做,或者因为您正在进行测试并且想要阻止直到测试结束以验证结果。

我先解决考试理论。

在Rx中,有Schedulers的概念(IScheduler接口)。 您可以使用这些来控制应该在何时何地执行工作。 例如。您可以指定可以在 TaskPool/ThreadPool、专用线程或调度程序 (WPF) 上安排工作。 您还可以指定工作是尽快完成,还是在以后的某个时间完成。

这些类型是控制 time/concurrency 的首选方式,因此可以从您的代码中删除 Task.Delay(1000 * o);

要以 Rx idomatic 方式复制您提供的测试,您将使用 TestScheduler 和支持的 ITestObserver<T>ITestObservable<T> 重写它(没有主题,不要让我开始)类型。

// Define other methods and classes here
public void TestObserveOnSeparateTask()
{
    var scheduler = new TestScheduler();
    var source = scheduler.CreateColdObservable<int>(
        ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks, 3),
        ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks, 2),
        ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks, 1),
        ReactiveTest.OnCompleted<int>(TimeSpan.FromSeconds(4).Ticks)
        );

    var observer = scheduler.CreateObserver<int>();
    var subscription = source
        .ObserveOn(scheduler)
//      .Subscribe(
//          o =>
//          {
//              Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
//              //Task.Delay(1000 * o);
//          },
//          () => Console.WriteLine("OnCompleted on threadId:{0}",
//              Thread.CurrentThread.ManagedThreadId));
        .Subscribe(observer);

    scheduler.Start();

    //Pretty silly test, as we apply no query/projection to the source.
    //  Note we add 1 tick due to it being a relative/cold observable and the ObserveOn scheduling will take one time slice to perform.
    ReactiveAssert.AreElementsEqual(
        new[] {
            ReactiveTest.OnNext(TimeSpan.FromSeconds(1).Ticks + 1, 3),
            ReactiveTest.OnNext(TimeSpan.FromSeconds(2).Ticks + 1, 2),
            ReactiveTest.OnNext(TimeSpan.FromSeconds(3).Ticks + 1, 1),
            ReactiveTest.OnCompleted<int>(TimeSpan.FromSeconds(4).Ticks + 1)
        },
        observer.Messages);
}

好的,但是如果您因为想等待某种结果或想知道源已完成而阻塞怎么办? 在这些情况下,您想要拥抱 Rx 和您可以应用的延续模式。 使用 SelectManyConcat 等运算符,您可以让一些序列仅在其他序列产生值或可能完成后触发。 我将这些模式称为异步门。 您可以在 51:03 的 this video 中看到 Matt Barrett 对它们的解释。 他展示了在完全异步 (Rx) 系统中如何仅在其他先决条件事件发生时才执行某些活动。 我不认为我可以在不完全了解你想要达到的目标的情况下给出一个更好的例子,但我怀疑答案是阻塞的,如果问题涉及 "the UI thread".

关于 Synchronize() 方法,这纯粹是为了实现 IObservable<T> 不遵循序列化合同的序列。 您可以应用该运营商来执行该合同。 理论上应该不需要用,如果大家都按规矩玩的话。

**编辑讨论CQRS/ES**

关于你问题的 CQRS 部分,如果你降低耦合度,你也可以这样做。我想您希望命令的发出有点 request/response。即您想知道命令是否被系统接受并成功处理。这是一个问题。一个单独的问题是异步更新读取模型。 在这种情况下,我可能只使用 Task/Task<T> 作为接受命令的方法的 return 值。 独立地,读取模型将订阅由向聚合根发出的命令产生的事件的可观察序列表示。

作为一个超级人为的例子,我把我能想到的 DDD ES 的最小实现结合在一起来展示 CommandsAggregatesrepositories/EventStore 并观察命令产生的事件。

void Main()
{
    var repository = new EventStore();
    repository.Events
        .OfType<FooEvent>()
        .Subscribe(evt=>UpdateReadModel(evt));

    var aggreate = new MyAggregate(Guid.NewGuid());
    var command = new FooCommand(1);
    aggreate.Handle(command);
    repository.Save(aggreate);

}
public void UpdateReadModel(FooEvent fooEvent)
{
    Console.WriteLine(fooEvent.SomeValue);
}

// Define other methods and classes here
public class FooCommand
{
    public FooCommand(int someValue)
    {
        SomeValue = someValue;
    }
    public int SomeValue { get; private set; }
}
public class FooEvent
{
    public FooEvent(int someValue)
    {
        SomeValue = someValue;
    }
    public int SomeValue { get; private set; }
}
public interface IAggregate
{
    List<object> GetUncommittedEvents();
    void Commit();
}
public class MyAggregate : IAggregate
{
    private readonly Guid _id;
    private readonly List<object> _uncommittedEvents = new List<object>();
    public MyAggregate(Guid id)
    {
        _id = id;
    }

    public List<Object> GetUncommittedEvents()
    {
        return _uncommittedEvents;
    }
    public void Commit()
    {
        _uncommittedEvents.Clear();
    }

    public void Handle(FooCommand command)
    {
        //As a result of processing the FooCommand we emit the FooEvent
        var fooEvent = new FooEvent(command.SomeValue);
        _uncommittedEvents.Add(fooEvent);
    }
}

public class EventStore
{
    private readonly ISubject<object> _events = new ReplaySubject<object>();

    public IObservable<object> Events { get { return _events.AsObservable(); } }
    public void Save(IAggregate aggregate)
    {
        foreach (var evt in aggregate.GetUncommittedEvents())
        {
            _events.OnNext(evt);
        }
        aggregate.Commit();
    }
}

现在在现实世界中,您会使用 GetEventStore.com、NEventStore 或 Cedar.EventStore 之类的东西,但结构保持不变。 你在聚合上推送命令,它产生事件,这些事件被保存到事件源,然后事件源将这些事件(band/Async)发送到 readmodels。