如何等待 Observable 的 Observable?

How to await on IObservable of IObservables?

我有一个 IObservable 个对象,我想将其异步转换为列表字典。

这是我的代码,其中 GetSource returns 和 IObservable:

await GetSource(...)
  .GroupBy(o => o.SiteId)
  .ToDictionary(g => g.Key, g => g.ToList())

当然是错的,因为结果是IDictionary<int, IObservable<IList<T>>>,但是我需要一个IDictionary<int, IList<T>>

本质上,我需要 await 每个 IObservable<IList<T>>,但我不知道如何优雅地做到这一点,我相信 Rx.NET 是可能的。

有什么想法吗?

请注意,无论您做什么,在源完成之前都无法完成字典。因此,一种简单的方法是像这样异步获取列表:

await GetSource(...).ToList();

然后像处理 GroupByToDictionary 一样处理结果 IList<T> - 请注意,您现在正在使用这些运算符的 IEnumerable<T> 实现完整列表。

此方法假定 GetSource() 可观察事件的数量和时间,并且组之间的分布使得在完成的流上计算组和字典的费用不会过高。由于最终您将返回一个完整的字典,无论如何我们都将保存在内存中,我们可能只考虑分组和创建字典条目的成本。与冗长的事件流的序列化传递相比,这在内存数据上的速度非常快,所以我倾向于认为这种方法在少数情况下并不完美。

如果它是禁止的,可能值得同时建立每个组和列表并且事件到达,在这种情况下你可以这样做:

await GetSource(...).GroupBy(x => x.SiteId)
                    .SelectMany(x => x.ToList())
                    .ToDictionary(x => x[0].SiteId);

请注意,ToDictionary keySelector 中列表的第一个元素将 始终存在(否则不会具体化任何组),因此这是安全的。它看起来确实有点奇怪,但这是我能想到的最简单的取出钥匙的方法。

或作为通用函数:

async Task<IDictionary<TKey, IList<T>>> ToDictionaryOfLists<T, TKey>(
    IObservable<T> source,
    Func<T, TKey> keySelector)
{        
    return await source.GroupBy(keySelector)
                       .SelectMany(x => x.ToList())
                       .ToDictionary(x => keySelector(x[0]));           
}

假设 class:

public class Site
{
    public int SiteId { get; set; }
}

你可以这样使用:

var result = ToDictionaryOfLists(GetSource(...), x=> x.SiteId);

编辑: 我刚刚了解到 Task 返回 Select(...) + Merge() 可以替换为 SelectMany(...) 所以更好的方法是:

IDictionary<int, IList<Site>> result = await GetSource()
.GroupBy(o => o.SiteId)
.SelectMany(async group => (group.Key, List: await group.ToList()))
.ToDictionary(group => group.Key, group => group.List);

原文:

我刚开始学习 RX,但也许这样的东西就足够了:

IDictionary<int, IList<Site>> result = await GetSource()
.GroupBy(o => o.SiteId)
.Select(async group => (group.Key, List: await group.ToList()))
.Merge()
.ToDictionary(group => group.Key, group => group.List);

假设网站实施如下:

public class Site
{
   public int SiteId { get; set; }
   //rest of class
}

看来您实际上并不需要 Dictionary,而是 Lookup<TKey,TElement>

Represents a collection of keys each mapped to one or more values.

通过使用内置的 RX 运算符 ToLookup 创建 Lookup 是微不足道的。 Converting LookupDictionary 的列表也很简单,通过使用标准的 LINQ 运算符 ToDictionary。以下是将这两个运算符组合在一个扩展方法中:

public static async Task<Dictionary<TKey, List<TSource>>> ToDictionaryOfLists<TSource, TKey>(
    IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
    var lookup = await source.ToLookup(keySelector);
    return lookup.ToDictionary(g => g.Key, g => g.ToList());
}