使用 Reactive Extensions 我可以创建一个可观察对象的订阅者,它会阻塞直到满足某些条件或发生超时
Using Reactive Extensions can I create subscribers to an observable which block until some condition is met or a timeout occurs
我可以I/Should为此使用响应式扩展吗?
基本上我有一个 ESB,我想监视它的消息(这基本上是我的热可观察对象所在的位置),我想创建一堆订阅者,它们将 block/pause 在线程中创建,只有当满足特定条件和/或发生超时。
我知道我可以使用 Tasks/TaskCompletionSource 和一个取消令牌来实现这一点,但我认为 RX 似乎很合适。
编辑:超时不是可观察的,而是应该在订阅者处,一旦满足timeout/condition,他们可以取消订阅并自行处理,以先到者为准。
编辑 2:订阅者和可观察对象可能会在不同的线程上处理——但不一定。如果在订阅者上使用 async/await,我希望能够 pause/block 在订阅者所在的位置执行。基本上,一些工作完成的信号
我所追求的是利用订阅者生命周期管理,因此我无需执行任何操作。 IConnectableObservable 似乎在这方面提供了一些希望。
也许这是方桩圆孔的地盘。
我认为这就是你所追求的,不完全确定。示例代码始终是一个有用的澄清器。诀窍是让您的运营商不在源头,而是通过订阅:
async void Main()
{
var hotObservable = new Subject<string>();
Func<string, bool> sub1_Condition = s => s != "Disconnect Sub1";
var subscription1 = hotObservable
.Timeout(TimeSpan.FromSeconds(5))
.TakeWhile(sub1_Condition)
.Subscribe(s => Console.WriteLine($"Sub 1: {s}"), _ => Console.WriteLine("Sub1 Timeout."), () => Console.WriteLine("Sub1 condition met or source ended."));
Func<string, bool> sub2_Condition = s => s != "Disconnect Sub2";
var subscription2 = hotObservable
.Timeout(TimeSpan.FromSeconds(2))
.TakeWhile(sub2_Condition)
.Subscribe(s => Console.WriteLine($"Sub 2: {s}"), _ => Console.WriteLine("Sub2 Timeout."), () => Console.WriteLine("Sub2 condition met or source ended."));
Func<string, bool> sub3_Condition = s => s != "Disconnect Sub3";
var subscription3 = hotObservable
.Timeout(TimeSpan.FromSeconds(7))
.TakeWhile(sub2_Condition)
.Subscribe(s => Console.WriteLine($"Sub 3: {s}"), _ => Console.WriteLine("Sub3 Timeout."), () => Console.WriteLine("Sub3 condition met or source ended."));
hotObservable.OnNext("Hello");
hotObservable.OnNext("Disconnect Sub1");
await Task.Delay(TimeSpan.FromSeconds(3));
hotObservable.OnNext("Just Sub 3 should be left");
hotObservable.OnCompleted();
}
这会产生以下输出:
Sub 1: Hello
Sub 2: Hello
Sub 3: Hello
Sub1 condition met or source ended.
Sub 2: Disconnect Sub1
Sub 3: Disconnect Sub1
Sub2 Timeout.
Sub 3: Just Sub 3 should be left
Sub3 condition met or source ended.
我可以I/Should为此使用响应式扩展吗? 基本上我有一个 ESB,我想监视它的消息(这基本上是我的热可观察对象所在的位置),我想创建一堆订阅者,它们将 block/pause 在线程中创建,只有当满足特定条件和/或发生超时。
我知道我可以使用 Tasks/TaskCompletionSource 和一个取消令牌来实现这一点,但我认为 RX 似乎很合适。
编辑:超时不是可观察的,而是应该在订阅者处,一旦满足timeout/condition,他们可以取消订阅并自行处理,以先到者为准。
编辑 2:订阅者和可观察对象可能会在不同的线程上处理——但不一定。如果在订阅者上使用 async/await,我希望能够 pause/block 在订阅者所在的位置执行。基本上,一些工作完成的信号
我所追求的是利用订阅者生命周期管理,因此我无需执行任何操作。 IConnectableObservable 似乎在这方面提供了一些希望。
也许这是方桩圆孔的地盘。
我认为这就是你所追求的,不完全确定。示例代码始终是一个有用的澄清器。诀窍是让您的运营商不在源头,而是通过订阅:
async void Main()
{
var hotObservable = new Subject<string>();
Func<string, bool> sub1_Condition = s => s != "Disconnect Sub1";
var subscription1 = hotObservable
.Timeout(TimeSpan.FromSeconds(5))
.TakeWhile(sub1_Condition)
.Subscribe(s => Console.WriteLine($"Sub 1: {s}"), _ => Console.WriteLine("Sub1 Timeout."), () => Console.WriteLine("Sub1 condition met or source ended."));
Func<string, bool> sub2_Condition = s => s != "Disconnect Sub2";
var subscription2 = hotObservable
.Timeout(TimeSpan.FromSeconds(2))
.TakeWhile(sub2_Condition)
.Subscribe(s => Console.WriteLine($"Sub 2: {s}"), _ => Console.WriteLine("Sub2 Timeout."), () => Console.WriteLine("Sub2 condition met or source ended."));
Func<string, bool> sub3_Condition = s => s != "Disconnect Sub3";
var subscription3 = hotObservable
.Timeout(TimeSpan.FromSeconds(7))
.TakeWhile(sub2_Condition)
.Subscribe(s => Console.WriteLine($"Sub 3: {s}"), _ => Console.WriteLine("Sub3 Timeout."), () => Console.WriteLine("Sub3 condition met or source ended."));
hotObservable.OnNext("Hello");
hotObservable.OnNext("Disconnect Sub1");
await Task.Delay(TimeSpan.FromSeconds(3));
hotObservable.OnNext("Just Sub 3 should be left");
hotObservable.OnCompleted();
}
这会产生以下输出:
Sub 1: Hello
Sub 2: Hello
Sub 3: Hello
Sub1 condition met or source ended.
Sub 2: Disconnect Sub1
Sub 3: Disconnect Sub1
Sub2 Timeout.
Sub 3: Just Sub 3 should be left
Sub3 condition met or source ended.