开启 IObservable<IEnumerable<IEnumerable<T>>>

Switching on IObservable<IEnumerable<IEnumerable<T>>>

我有以下结构:

// source of data
interface IItem
{
    IObservable<string> Changed { get; }
}

interface IItemCollection
{
    List<IItem> Items { get; }
    IObservable<IItem> ItemAdded { get; }
    IObservable<IItem> ItemRemoved { get; }
}

interface IItemCollectionManager
{
    List<IItemCollection> ItemCollectionCollection { get; }
    IObservable<IItemCollection> ItemCollectionAdded { get; }
    IObservable<IItemCollection> ItemCollectionRemoved { get; }
}

// desired result
interface IAggregation
{
    IObservable<string> Changed { get; }
}

这里的目标是 IAggregation 公开单个可观察对象。但是,IItem 可以随时从每个 IItemCollection 中添加和删除,事实上,IItemCollection 也可以随时从 IItemCollectionManager 中添加或删除。当然,当添加这样的 IItemCollection 时,Aggregation 也应该从那个发出值,如果删除 ItemCollection,我不再希望 string 来自要发出的集合中的 IItems。此外,当将 Item 添加到任何 IItemCollection 时,来自其 Changed 可观察值的值也应该产生来自 IAggregationChanged 可观察值的值。

现在,当只有一个 IItemCollection 时,解决这个问题相当简单,例如像这样:

class AggregationImpl : IAggregation 
{
    public AggregationImpl(IItemCollection itemCollection)
    {
        var added = itemCollection.ItemAdded
            .Select(_ => itemCollection.Items);
        var removed = itemCollection.ItemRemoved
            .Select(_ => itemCollection.Items);

        Changed = Observable.Merge(added, removed)
            .StartWith(itemCollection.Items)
            .Select(coll => coll.Select(item => item.Changed).Merge())
            .Switch();
    }

    public IObservable<string> Changed { get; }

}

...这里的关键点是,我将所有 ItemChanged 可观察对象展平为带有 Merge() 的单个可观察对象,然后,每次一个项目是添加或删除,我重新创建整个 Observable 并使用 Switch() 取消订阅旧的并订阅新的。

我觉得扩展到包括 IItemCollectionManager 应该很简单,但我不太确定如何处理它。

我希望这能奏效,或者至少能让你走上正确的道路。由于测试似乎相当复杂,我打算即兴发挥。如果你有一些简单的测试代码,那我很乐意测试。

首先,我不太喜欢您发布的实现。您连接到 ItemAddedItemRemoved obeservables,根本没有使用数据;您正在从 Items 属性 获取数据。这可能导致在较差的实现中出现竞争条件,其中事件在 属性 更新之前发出。因此,我创建了自己的实现。我还建议将其放入扩展方法中,因为这会让以后的生活更轻松:

public static IObservable<string> ToAggregatedObservable(this IItemCollection itemCollection)
{
    return Observable.Merge(
            itemCollection.ItemAdded.Select(item => (op: "+", item)),
            itemCollection.ItemRemoved.Select(item => (op: "-", item))
        )
        .Scan(ImmutableList<IItem>.Empty.AddRange(itemCollection.Items), (list, t) =>
            t.op == "+"
                ? list.Add(t.item)
                : list.Remove(t.item)
        )
        .Select(l => l.Select(item => item.Changed).Merge())
        .Switch();

}

原谅魔弦一秒钟,如果你愿意,你可以把它变成一个enum。我们在 Scan 内的 ImmutableList 中维护当前项目的状态。当一个项目获得 added/removed 时,我们更新列表,然后切换可观察对象。

同样的逻辑也适用于 collection 经理级别:

public static IObservable<string> ToAggregatedObservable(this IItemCollectionManager itemCollectionManager)
{
    return Observable.Merge(
            itemCollectionManager.ItemCollectionAdded.Select(itemColl => (op: "+", itemColl)),
            itemCollectionManager.ItemCollectionRemoved.Select(itemColl => (op: "-", itemColl))
        )
        .Scan(ImmutableList<IItemCollection>.Empty.AddRange(itemCollectionManager.ItemCollectionCollection), (list, t) =>
            t.op == "+"
                ? list.Add(t.itemColl)
                : list.Remove(t.itemColl)
        )
        .Select(l => l.Select(itemColl => itemColl.ToAggregatedObservable()).Merge())
        .Switch();
}

这里我们只是re-using第一种扩展方法,并且使用与之前相同的adding/removing然后切换逻辑。