如何使用 Reactive Extensions (Rx.Net) 等待一个值或直到经过一段固定的时间
How to wait for a value or until a fixed amount of time has elapsed using Reactive Extensions (Rx.Net)
我想等待(阻塞)一个线程,直到某个时间过去或另一个流抽取一个值,我认为下面的方法可能会实现这一点,但它抛出异常,因为第一个流是空的,
// class level subject manipulated by another thread...
_updates = new Subject<Unit>();
...
// wait for up to 5 seconds before carrying on...
var result = Observable.Timer(DateTime.Now.AddSeconds(5))
.TakeUntil(_updates)
.Wait();
我怎样才能实现最多阻塞 5 秒或直到另一个流抽取值的能力?
您可以这样使用 Observable.Timeout
:
var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5)).Wait();
我使用 Take(1)
因为超时期望序列完成,而不仅仅是产生下一个值。超时时会抛出 System.TimeoutException
.
如果您不想例外 - 您可以使用 Catch
来提供一些值:
var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5))
.Catch(Observable.Return(default(Unit))).Wait();
// should catch specific exception, not all
如果您的 Unit
确实是@Shlomo 提到的 rx 单元 - 您可以这样更改它:
var result = _updates.Select(c => (Unit?) c).Take(1)
.Timeout(DateTime.Now.AddSeconds(5)).Catch(Observable.Return((Unit?) null)).Wait();
或者像往常一样捕捉异常。
你能创建两个任务然后等待吗?这可能是实现该目标的 "easiest" 方法。
var waitTask = new Task(() => Thread.Sleep(5000));
var messageTask = new Task(() => <listen for input and return>);
await Task.WhenAny(waitTask, messageTask);
<the rest of your code>
编辑:标题更改为指定 React,答案可能不再适用。
如果您不想处理异常,还有另一种选择:
var _updates = new Subject<Unit>();
var result = Observable.Merge(
_updates.Materialize(),
Observable.Empty<Unit>()
.Delay(TimeSpan.FromSeconds(5))
.Materialize()
)
.Take(1)
.Wait();
switch (result.Kind)
{
case NotificationKind.OnCompleted:
//time's up, or someone called _updates.OnCompleted().
break;
case NotificationKind.OnNext:
var message = result.Value;
//Message received. Handle message
break;
case NotificationKind.OnError:
var exception = result.Exception;
//Exception thrown. Handle exception
break;
}
我想等待(阻塞)一个线程,直到某个时间过去或另一个流抽取一个值,我认为下面的方法可能会实现这一点,但它抛出异常,因为第一个流是空的,
// class level subject manipulated by another thread...
_updates = new Subject<Unit>();
...
// wait for up to 5 seconds before carrying on...
var result = Observable.Timer(DateTime.Now.AddSeconds(5))
.TakeUntil(_updates)
.Wait();
我怎样才能实现最多阻塞 5 秒或直到另一个流抽取值的能力?
您可以这样使用 Observable.Timeout
:
var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5)).Wait();
我使用 Take(1)
因为超时期望序列完成,而不仅仅是产生下一个值。超时时会抛出 System.TimeoutException
.
如果您不想例外 - 您可以使用 Catch
来提供一些值:
var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5))
.Catch(Observable.Return(default(Unit))).Wait();
// should catch specific exception, not all
如果您的 Unit
确实是@Shlomo 提到的 rx 单元 - 您可以这样更改它:
var result = _updates.Select(c => (Unit?) c).Take(1)
.Timeout(DateTime.Now.AddSeconds(5)).Catch(Observable.Return((Unit?) null)).Wait();
或者像往常一样捕捉异常。
你能创建两个任务然后等待吗?这可能是实现该目标的 "easiest" 方法。
var waitTask = new Task(() => Thread.Sleep(5000));
var messageTask = new Task(() => <listen for input and return>);
await Task.WhenAny(waitTask, messageTask);
<the rest of your code>
编辑:标题更改为指定 React,答案可能不再适用。
如果您不想处理异常,还有另一种选择:
var _updates = new Subject<Unit>();
var result = Observable.Merge(
_updates.Materialize(),
Observable.Empty<Unit>()
.Delay(TimeSpan.FromSeconds(5))
.Materialize()
)
.Take(1)
.Wait();
switch (result.Kind)
{
case NotificationKind.OnCompleted:
//time's up, or someone called _updates.OnCompleted().
break;
case NotificationKind.OnNext:
var message = result.Value;
//Message received. Handle message
break;
case NotificationKind.OnError:
var exception = result.Exception;
//Exception thrown. Handle exception
break;
}