如何使用 Reactive Extensions (Rx.Net) 等待一个值或直到经过一段固定的时间

How to wait for a value or until a fixed amount of time has elapsed using Reactive Extensions (Rx.Net)

我想等待(阻塞)一个线程,直到某个时间过去或另一个流抽取一个值,我认为下面的方法可能会实现这一点,但它抛出异常,因为第一个流是空的,

 // class level subject manipulated by another thread...
 _updates = new Subject<Unit>();
 ...
 // wait for up to 5 seconds before carrying on...    
 var result = Observable.Timer(DateTime.Now.AddSeconds(5))
    .TakeUntil(_updates)
    .Wait();

我怎样才能实现最多阻塞 5 秒或直到另一个流抽取值的能力?

您可以这样使用 Observable.Timeout

 var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5)).Wait();

我使用 Take(1) 因为超时期望序列完成,而不仅仅是产生下一个值。超时时会抛出 System.TimeoutException.

如果您不想例外 - 您可以使用 Catch 来提供一些值:

var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5))
    .Catch(Observable.Return(default(Unit))).Wait();
// should catch specific exception, not all

如果您的 Unit 确实是@Shlomo 提到的 rx 单元 - 您可以这样更改它:

var result = _updates.Select(c => (Unit?) c).Take(1)
    .Timeout(DateTime.Now.AddSeconds(5)).Catch(Observable.Return((Unit?) null)).Wait();

或者像往常一样捕捉异常。

你能创建两个任务然后等待吗?这可能是实现该目标的 "easiest" 方法。

var waitTask = new Task(() => Thread.Sleep(5000));
var messageTask = new Task(() => <listen for input and return>);

await Task.WhenAny(waitTask, messageTask);

<the rest of your code>

编辑:标题更改为指定 React,答案可能不再适用。

如果您不想处理异常,还有另一种选择:

var _updates = new Subject<Unit>();

var result = Observable.Merge(
    _updates.Materialize(),
    Observable.Empty<Unit>()
       .Delay(TimeSpan.FromSeconds(5))
       .Materialize()
   )
   .Take(1)
   .Wait();

switch (result.Kind)
{
    case NotificationKind.OnCompleted:
        //time's up, or someone called _updates.OnCompleted().
        break;
    case NotificationKind.OnNext:
        var message = result.Value;
        //Message received. Handle message
        break;
    case NotificationKind.OnError:
        var exception = result.Exception;
        //Exception thrown. Handle exception
        break;
}