开启 IObservable<IEnumerable<IEnumerable<T>>>
Switching on IObservable<IEnumerable<IEnumerable<T>>>
我有以下结构:
// source of data
interface IItem
{
IObservable<string> Changed { get; }
}
interface IItemCollection
{
List<IItem> Items { get; }
IObservable<IItem> ItemAdded { get; }
IObservable<IItem> ItemRemoved { get; }
}
interface IItemCollectionManager
{
List<IItemCollection> ItemCollectionCollection { get; }
IObservable<IItemCollection> ItemCollectionAdded { get; }
IObservable<IItemCollection> ItemCollectionRemoved { get; }
}
// desired result
interface IAggregation
{
IObservable<string> Changed { get; }
}
这里的目标是 IAggregation
公开单个可观察对象。但是,IItem
可以随时从每个 IItemCollection
中添加和删除,事实上,IItemCollection
也可以随时从 IItemCollectionManager
中添加或删除。当然,当添加这样的 IItemCollection
时,Aggregation
也应该从那个发出值,如果删除 ItemCollection
,我不再希望 string
来自要发出的集合中的 IItem
s。此外,当将 Item
添加到任何 IItemCollection
时,来自其 Changed
可观察值的值也应该产生来自 IAggregation
的 Changed
可观察值的值。
现在,当只有一个 IItemCollection
时,解决这个问题相当简单,例如像这样:
class AggregationImpl : IAggregation
{
public AggregationImpl(IItemCollection itemCollection)
{
var added = itemCollection.ItemAdded
.Select(_ => itemCollection.Items);
var removed = itemCollection.ItemRemoved
.Select(_ => itemCollection.Items);
Changed = Observable.Merge(added, removed)
.StartWith(itemCollection.Items)
.Select(coll => coll.Select(item => item.Changed).Merge())
.Switch();
}
public IObservable<string> Changed { get; }
}
...这里的关键点是,我将所有 Item
的 Changed
可观察对象展平为带有 Merge()
的单个可观察对象,然后,每次一个项目是添加或删除,我重新创建整个 Observable
并使用 Switch()
取消订阅旧的并订阅新的。
我觉得扩展到包括 IItemCollectionManager
应该很简单,但我不太确定如何处理它。
我希望这能奏效,或者至少能让你走上正确的道路。由于测试似乎相当复杂,我打算即兴发挥。如果你有一些简单的测试代码,那我很乐意测试。
首先,我不太喜欢您发布的实现。您连接到 ItemAdded
和 ItemRemoved
obeservables,根本没有使用数据;您正在从 Items
属性 获取数据。这可能导致在较差的实现中出现竞争条件,其中事件在 属性 更新之前发出。因此,我创建了自己的实现。我还建议将其放入扩展方法中,因为这会让以后的生活更轻松:
public static IObservable<string> ToAggregatedObservable(this IItemCollection itemCollection)
{
return Observable.Merge(
itemCollection.ItemAdded.Select(item => (op: "+", item)),
itemCollection.ItemRemoved.Select(item => (op: "-", item))
)
.Scan(ImmutableList<IItem>.Empty.AddRange(itemCollection.Items), (list, t) =>
t.op == "+"
? list.Add(t.item)
: list.Remove(t.item)
)
.Select(l => l.Select(item => item.Changed).Merge())
.Switch();
}
原谅魔弦一秒钟,如果你愿意,你可以把它变成一个enum
。我们在 Scan
内的 ImmutableList
中维护当前项目的状态。当一个项目获得 added/removed 时,我们更新列表,然后切换可观察对象。
同样的逻辑也适用于 collection 经理级别:
public static IObservable<string> ToAggregatedObservable(this IItemCollectionManager itemCollectionManager)
{
return Observable.Merge(
itemCollectionManager.ItemCollectionAdded.Select(itemColl => (op: "+", itemColl)),
itemCollectionManager.ItemCollectionRemoved.Select(itemColl => (op: "-", itemColl))
)
.Scan(ImmutableList<IItemCollection>.Empty.AddRange(itemCollectionManager.ItemCollectionCollection), (list, t) =>
t.op == "+"
? list.Add(t.itemColl)
: list.Remove(t.itemColl)
)
.Select(l => l.Select(itemColl => itemColl.ToAggregatedObservable()).Merge())
.Switch();
}
这里我们只是re-using第一种扩展方法,并且使用与之前相同的adding/removing然后切换逻辑。
我有以下结构:
// source of data
interface IItem
{
IObservable<string> Changed { get; }
}
interface IItemCollection
{
List<IItem> Items { get; }
IObservable<IItem> ItemAdded { get; }
IObservable<IItem> ItemRemoved { get; }
}
interface IItemCollectionManager
{
List<IItemCollection> ItemCollectionCollection { get; }
IObservable<IItemCollection> ItemCollectionAdded { get; }
IObservable<IItemCollection> ItemCollectionRemoved { get; }
}
// desired result
interface IAggregation
{
IObservable<string> Changed { get; }
}
这里的目标是 IAggregation
公开单个可观察对象。但是,IItem
可以随时从每个 IItemCollection
中添加和删除,事实上,IItemCollection
也可以随时从 IItemCollectionManager
中添加或删除。当然,当添加这样的 IItemCollection
时,Aggregation
也应该从那个发出值,如果删除 ItemCollection
,我不再希望 string
来自要发出的集合中的 IItem
s。此外,当将 Item
添加到任何 IItemCollection
时,来自其 Changed
可观察值的值也应该产生来自 IAggregation
的 Changed
可观察值的值。
现在,当只有一个 IItemCollection
时,解决这个问题相当简单,例如像这样:
class AggregationImpl : IAggregation
{
public AggregationImpl(IItemCollection itemCollection)
{
var added = itemCollection.ItemAdded
.Select(_ => itemCollection.Items);
var removed = itemCollection.ItemRemoved
.Select(_ => itemCollection.Items);
Changed = Observable.Merge(added, removed)
.StartWith(itemCollection.Items)
.Select(coll => coll.Select(item => item.Changed).Merge())
.Switch();
}
public IObservable<string> Changed { get; }
}
...这里的关键点是,我将所有 Item
的 Changed
可观察对象展平为带有 Merge()
的单个可观察对象,然后,每次一个项目是添加或删除,我重新创建整个 Observable
并使用 Switch()
取消订阅旧的并订阅新的。
我觉得扩展到包括 IItemCollectionManager
应该很简单,但我不太确定如何处理它。
我希望这能奏效,或者至少能让你走上正确的道路。由于测试似乎相当复杂,我打算即兴发挥。如果你有一些简单的测试代码,那我很乐意测试。
首先,我不太喜欢您发布的实现。您连接到 ItemAdded
和 ItemRemoved
obeservables,根本没有使用数据;您正在从 Items
属性 获取数据。这可能导致在较差的实现中出现竞争条件,其中事件在 属性 更新之前发出。因此,我创建了自己的实现。我还建议将其放入扩展方法中,因为这会让以后的生活更轻松:
public static IObservable<string> ToAggregatedObservable(this IItemCollection itemCollection)
{
return Observable.Merge(
itemCollection.ItemAdded.Select(item => (op: "+", item)),
itemCollection.ItemRemoved.Select(item => (op: "-", item))
)
.Scan(ImmutableList<IItem>.Empty.AddRange(itemCollection.Items), (list, t) =>
t.op == "+"
? list.Add(t.item)
: list.Remove(t.item)
)
.Select(l => l.Select(item => item.Changed).Merge())
.Switch();
}
原谅魔弦一秒钟,如果你愿意,你可以把它变成一个enum
。我们在 Scan
内的 ImmutableList
中维护当前项目的状态。当一个项目获得 added/removed 时,我们更新列表,然后切换可观察对象。
同样的逻辑也适用于 collection 经理级别:
public static IObservable<string> ToAggregatedObservable(this IItemCollectionManager itemCollectionManager)
{
return Observable.Merge(
itemCollectionManager.ItemCollectionAdded.Select(itemColl => (op: "+", itemColl)),
itemCollectionManager.ItemCollectionRemoved.Select(itemColl => (op: "-", itemColl))
)
.Scan(ImmutableList<IItemCollection>.Empty.AddRange(itemCollectionManager.ItemCollectionCollection), (list, t) =>
t.op == "+"
? list.Add(t.itemColl)
: list.Remove(t.itemColl)
)
.Select(l => l.Select(itemColl => itemColl.ToAggregatedObservable()).Merge())
.Switch();
}
这里我们只是re-using第一种扩展方法,并且使用与之前相同的adding/removing然后切换逻辑。