使用 System.Reactive.Linq 订阅第一次事件发生并取消

Subscribe for first event occurence with cancellation using System.Reactive.Linq

给定从事件流生成的 IObservable 序列:

IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()

我得到第一次事件发生:

MyEvent myEvent = await observable.FirstOrDefaultAsync();

但是,我想处理订阅,resp。当用户点击取消按钮时中断 Observable。

目前我使用 .ToTask() 扩展方法的变通方法,但我相信有仅基于 Rective 扩展的更清晰的解决方案。

_tsc = new CancellationTokenSource();
MyEvent myEvent;
try
{
    myEvent = await EventAggregator.GetEvent<MyEvent>()
        .FirstOrDefaultAsync()
        .ToTask(__tsc.Token);
}
catch (TaskCanceledException)
{
    myEvent = null;
}


void Cancel()
{
    _tsc.Cancel();
}

我不确定这是 "cleaner",但你可以在没有 ToTask 的情况下这样做:

static async void Test(IObservable<int> ob, CancellationToken ct)
{
    var first = await ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => {
            o.OnNext(Unit.Default);
            o.OnCompleted();
    }))).FirstOrDefaultAsync();            
} 

所以我们创建另一个可观察对象,它将在取消标记被取消时产生一个元素,然后我们使用 TakeUntil 重载,它将 return 从第一个序列到第二个序列的元素产生一个元素。因此,在您取消令牌后 - 您的 await 语句将 return 具有默认值(null 用于引用类型)。

你可以把它移到扩展方法中,这样看起来会更好:

public static class Extensions {
    public static IObservable<T> TakeUntilCancelled<T>(this IObservable<T> ob, CancellationToken ct) {
        return ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() =>
        {
            o.OnNext(Unit.Default);
            o.OnCompleted();
        })));
    }
}

var first = await ob.TakeUntilCancelled(ct).FirstOrDefaultAsync();      

使用内置的 Rx 运算符,您想做的事情非常简单。

只需这样做:

IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()
var endItAll = new Subject<Unit>();
MyEvent myEvent = await observable.TakeUntil(endItAll).FirstOrDefaultAsync();

现在您只需调用 endItAll.OnNext(Unit.Default) 即可结束订阅,然后 return 一个 null MyEvent.