使用 Reactive Extensions 我可以创建一个可观察对象的订阅者,它会阻塞直到满足某些条件或发生超时

Using Reactive Extensions can I create subscribers to an observable which block until some condition is met or a timeout occurs

我可以I/Should为此使用响应式扩展吗? 基本上我有一个 ESB,我想监视它的消息(这基本上是我的热可观察对象所在的位置),我想创建一堆订阅者,它们将 block/pause 在线程中创建,只有当满足特定条件和/或发生超时。

我知道我可以使用 Tasks/TaskCompletionSource 和一个取消令牌来实现这一点,但我认为 RX 似乎很合适。

编辑:超时不是可观察的,而是应该在订阅者处,一旦满足timeout/condition,他们可以取消订阅并自行处理,以先到者为准。

编辑 2:订阅者和可观察对象可能会在不同的线程上处理——但不一定。如果在订阅者上使用 async/await,我希望能够 pause/block 在订阅者所在的位置执行。基本上,一些工作完成的信号

我所追求的是利用订阅者生命周期管理,因此我无需执行任何操作。 IConnectableObservable 似乎在这方面提供了一些希望。

也许这是方桩圆孔的地盘。

我认为这就是你所追求的,不完全确定。示例代码始终是一个有用的澄清器。诀窍是让您的运营商不在源头,而是通过订阅:

async void Main()
{
    var hotObservable = new Subject<string>();

    Func<string, bool> sub1_Condition = s => s != "Disconnect Sub1";
    var subscription1 = hotObservable
        .Timeout(TimeSpan.FromSeconds(5))
        .TakeWhile(sub1_Condition)
        .Subscribe(s => Console.WriteLine($"Sub 1: {s}"), _ => Console.WriteLine("Sub1 Timeout."), () => Console.WriteLine("Sub1 condition met or source ended."));

    Func<string, bool> sub2_Condition = s => s != "Disconnect Sub2";
    var subscription2 = hotObservable
        .Timeout(TimeSpan.FromSeconds(2))
        .TakeWhile(sub2_Condition)
        .Subscribe(s => Console.WriteLine($"Sub 2: {s}"), _ => Console.WriteLine("Sub2 Timeout."), () => Console.WriteLine("Sub2 condition met or source ended."));

    Func<string, bool> sub3_Condition = s => s != "Disconnect Sub3";
    var subscription3 = hotObservable
        .Timeout(TimeSpan.FromSeconds(7))
        .TakeWhile(sub2_Condition)
        .Subscribe(s => Console.WriteLine($"Sub 3: {s}"), _ => Console.WriteLine("Sub3 Timeout."), () => Console.WriteLine("Sub3 condition met or source ended."));

    hotObservable.OnNext("Hello");
    hotObservable.OnNext("Disconnect Sub1");

    await Task.Delay(TimeSpan.FromSeconds(3));

    hotObservable.OnNext("Just Sub 3 should be left");
    hotObservable.OnCompleted();
}

这会产生以下输出:

Sub 1: Hello
Sub 2: Hello
Sub 3: Hello
Sub1 condition met or source ended.
Sub 2: Disconnect Sub1
Sub 3: Disconnect Sub1
Sub2 Timeout.
Sub 3: Just Sub 3 should be left
Sub3 condition met or source ended.