使用 System.Reactive.Linq 订阅第一次事件发生并取消
Subscribe for first event occurence with cancellation using System.Reactive.Linq
给定从事件流生成的 IObservable 序列:
IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()
我得到第一次事件发生:
MyEvent myEvent = await observable.FirstOrDefaultAsync();
但是,我想处理订阅,resp。当用户点击取消按钮时中断 Observable。
目前我使用 .ToTask() 扩展方法的变通方法,但我相信有仅基于 Rective 扩展的更清晰的解决方案。
_tsc = new CancellationTokenSource();
MyEvent myEvent;
try
{
myEvent = await EventAggregator.GetEvent<MyEvent>()
.FirstOrDefaultAsync()
.ToTask(__tsc.Token);
}
catch (TaskCanceledException)
{
myEvent = null;
}
void Cancel()
{
_tsc.Cancel();
}
我不确定这是 "cleaner",但你可以在没有 ToTask
的情况下这样做:
static async void Test(IObservable<int> ob, CancellationToken ct)
{
var first = await ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => {
o.OnNext(Unit.Default);
o.OnCompleted();
}))).FirstOrDefaultAsync();
}
所以我们创建另一个可观察对象,它将在取消标记被取消时产生一个元素,然后我们使用 TakeUntil
重载,它将 return 从第一个序列到第二个序列的元素产生一个元素。因此,在您取消令牌后 - 您的 await
语句将 return 具有默认值(null
用于引用类型)。
你可以把它移到扩展方法中,这样看起来会更好:
public static class Extensions {
public static IObservable<T> TakeUntilCancelled<T>(this IObservable<T> ob, CancellationToken ct) {
return ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() =>
{
o.OnNext(Unit.Default);
o.OnCompleted();
})));
}
}
var first = await ob.TakeUntilCancelled(ct).FirstOrDefaultAsync();
使用内置的 Rx 运算符,您想做的事情非常简单。
只需这样做:
IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()
var endItAll = new Subject<Unit>();
MyEvent myEvent = await observable.TakeUntil(endItAll).FirstOrDefaultAsync();
现在您只需调用 endItAll.OnNext(Unit.Default)
即可结束订阅,然后 return 一个 null
MyEvent
.
给定从事件流生成的 IObservable 序列:
IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()
我得到第一次事件发生:
MyEvent myEvent = await observable.FirstOrDefaultAsync();
但是,我想处理订阅,resp。当用户点击取消按钮时中断 Observable。
目前我使用 .ToTask() 扩展方法的变通方法,但我相信有仅基于 Rective 扩展的更清晰的解决方案。
_tsc = new CancellationTokenSource();
MyEvent myEvent;
try
{
myEvent = await EventAggregator.GetEvent<MyEvent>()
.FirstOrDefaultAsync()
.ToTask(__tsc.Token);
}
catch (TaskCanceledException)
{
myEvent = null;
}
void Cancel()
{
_tsc.Cancel();
}
我不确定这是 "cleaner",但你可以在没有 ToTask
的情况下这样做:
static async void Test(IObservable<int> ob, CancellationToken ct)
{
var first = await ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => {
o.OnNext(Unit.Default);
o.OnCompleted();
}))).FirstOrDefaultAsync();
}
所以我们创建另一个可观察对象,它将在取消标记被取消时产生一个元素,然后我们使用 TakeUntil
重载,它将 return 从第一个序列到第二个序列的元素产生一个元素。因此,在您取消令牌后 - 您的 await
语句将 return 具有默认值(null
用于引用类型)。
你可以把它移到扩展方法中,这样看起来会更好:
public static class Extensions {
public static IObservable<T> TakeUntilCancelled<T>(this IObservable<T> ob, CancellationToken ct) {
return ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() =>
{
o.OnNext(Unit.Default);
o.OnCompleted();
})));
}
}
var first = await ob.TakeUntilCancelled(ct).FirstOrDefaultAsync();
使用内置的 Rx 运算符,您想做的事情非常简单。
只需这样做:
IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()
var endItAll = new Subject<Unit>();
MyEvent myEvent = await observable.TakeUntil(endItAll).FirstOrDefaultAsync();
现在您只需调用 endItAll.OnNext(Unit.Default)
即可结束订阅,然后 return 一个 null
MyEvent
.