有条件地组合可观察量

Combining observables conditionally

我有两个 observables,一个 IObservable<AlertData> 和另一个 IObservable<SoundRequestData>AlertData 包含一个 属性 Id,它知道哪个 SoundRequestData 属于它。 SoundRequestData 只知道自己并且有一个 Id 属性 可以与 AlertData.

中的匹配

我想将这两种数据类型合并成一个新类型AlertDataViewModel。但是,我不能确定到达两个可观察对象的数据顺序是否相同。我现在也不关心输出中的顺序。

我想要的是匹配AlertDataSoundRequestData

我现在使用的方法是等到其中一个可观察对象完成将所有数据提取到 ObservableCollection 中,这种方法有效但速度很慢。之后,我启动另一个可观察对象并匹配 ID。

有更好的方法吗?我想这可以表示为以下弹珠图:

所以 a.id=1 匹配到 3.id=1b.id=2 匹配到 4.id=2 等等。

先介绍一下IObserver<T>的扩展方法。

public static IObserver<T> Safe<T>(this IObserver<T> observer)
{
    var done = false;
    return Observer.Create<TResult>(
        value =>
        {
            if (!done)
            {
                observer.OnNext(value);
            }
        },
        error =>
        {
            if (!done)
            {
                done = true;
                observer.OnError(error);
            }
        },
        () =>
        {
            if (!done)
            {
                done = true;
                observer.OnCompleted();
            }
        });
}

这只是确保在模式 OnNext*(OnError|OnCompleted) 中调用观察者,并且忽略违反该模式的行为。

我们现在可以通过按键缓冲来自两个序列的值并仅在两个序列之间的键匹配时才发出它们来实现您描述的运算符。

public static IObservable<TResult> Join<T1, T2, TKey, TResult>(
    IObservable<T1> source1,
    IObservable<T2> source2,
    Func<T1, TKey> key1,
    Func<T2, TKey> key2,
    Func<T1, T2, TResult> selector)
{
    return Observable.Create<TResult>(observer =>
    {
        var dict1 = new Dictionary<TKey, T1>();
        var dict2 = new Dictionary<TKey, T2>();
        var gate = new object();
        var safeObserver = observer.Safe();
        Action<TKey> emit = k =>
        {
            T1 value1;
            T2 value2;
            if (dict1.TryGetValue(k, out value1) && dict2.TryGetValue(k, out value2))
            {
                var result = selector(value1, value2);
                safeObserver.OnValue(result);
                dict1.Remove(k);
                dict2.Remove(k);
            }
        };
        return new CompositeDisposable(
            source1.Synchronize(gate).Subscribe(
                value1 =>
                {
                    var k = key1(value1);
                    dict1[k] = value1;
                    emit(k);
                },
                safeObserver.OnError,
                safeObserver.OnCompleted),
            source2.Synchronize(gate).Subscribe(
                value2 =>
                {
                    var k = key2(value2);
                    dict2[k] = value2;
                    emit(k);
                },
                safeObserver.OnError,
                safeObserver.OnCompleted));
    });
}

示例:

IObservable<AlertData> alertDatas = ...;
IObservable<SoundRequestData> = soundRequestDatas = ...;
IObservable<AlertDataViewModel> alertDataViewModels = Join(
    alertDatas,
    soundRequestDatas,
    alertData => alertData.Id,
    soundRequestData => soundRequestData.Id,
    (alertData, soundRequestData) => new AlertDataViewModel
    {
        AlertData = alertData,
        SoundRequestData = soundRequestData
    });

这不是最漂亮的,但它会起作用。

它将return这个class,这只是原来两个的集合:

class Aggregate
{
    public AlertData AlertData {get;set;}
    public SoundRequestData SoundRequestData { get; set; }
    public int Id { get { return AlertData == null ? SoundRequestData.Id : AlertData.Id; } } 
}

这是连接逻辑:

var joined = Observable.Merge(   // Convert the two sources into half-filled aggregates and merge them
        source1.Select(a => new Aggregate() { AlertData = a }),
        source2.Select(s => new Aggregate() { SoundRequestData = s }))
    .GroupBy(a => a.Id)
    // We only need two for each Id
    .Select(group => group.Take(2))  
    // This looks ugly, but is just joining the two messages into one
    .Select(group => group.Aggregate(new Aggregate(), (agg, newData) => new Aggregate() { AlertData = agg.AlertData ?? newData.AlertData, SoundRequestData = agg.SoundRequestData ?? newData.SoundRequestData }))
    // Back to one stream
    .Merge();