如何等待 Observable 的 Observable?
How to await on IObservable of IObservables?
我有一个 IObservable
个对象,我想将其异步转换为列表字典。
这是我的代码,其中 GetSource returns 和 IObservable
:
await GetSource(...)
.GroupBy(o => o.SiteId)
.ToDictionary(g => g.Key, g => g.ToList())
当然是错的,因为结果是IDictionary<int, IObservable<IList<T>>>
,但是我需要一个IDictionary<int, IList<T>>
本质上,我需要 await
每个 IObservable<IList<T>>
,但我不知道如何优雅地做到这一点,我相信 Rx.NET 是可能的。
有什么想法吗?
请注意,无论您做什么,在源完成之前都无法完成字典。因此,一种简单的方法是像这样异步获取列表:
await GetSource(...).ToList();
然后像处理 GroupBy
和 ToDictionary
一样处理结果 IList<T>
- 请注意,您现在正在使用这些运算符的 IEnumerable<T>
实现完整列表。
此方法假定 GetSource()
可观察事件的数量和时间,并且组之间的分布使得在完成的流上计算组和字典的费用不会过高。由于最终您将返回一个完整的字典,无论如何我们都将保存在内存中,我们可能只考虑分组和创建字典条目的成本。与冗长的事件流的序列化传递相比,这在内存数据上的速度非常快,所以我倾向于认为这种方法在少数情况下并不完美。
如果它是禁止的,可能值得同时建立每个组和列表并且事件到达,在这种情况下你可以这样做:
await GetSource(...).GroupBy(x => x.SiteId)
.SelectMany(x => x.ToList())
.ToDictionary(x => x[0].SiteId);
请注意,ToDictionary
keySelector 中列表的第一个元素将 始终存在(否则不会具体化任何组),因此这是安全的。它看起来确实有点奇怪,但这是我能想到的最简单的取出钥匙的方法。
或作为通用函数:
async Task<IDictionary<TKey, IList<T>>> ToDictionaryOfLists<T, TKey>(
IObservable<T> source,
Func<T, TKey> keySelector)
{
return await source.GroupBy(keySelector)
.SelectMany(x => x.ToList())
.ToDictionary(x => keySelector(x[0]));
}
假设 class:
public class Site
{
public int SiteId { get; set; }
}
你可以这样使用:
var result = ToDictionaryOfLists(GetSource(...), x=> x.SiteId);
编辑:
我刚刚了解到 Task 返回 Select(...) + Merge() 可以替换为 SelectMany(...) 所以更好的方法是:
IDictionary<int, IList<Site>> result = await GetSource()
.GroupBy(o => o.SiteId)
.SelectMany(async group => (group.Key, List: await group.ToList()))
.ToDictionary(group => group.Key, group => group.List);
原文:
我刚开始学习 RX,但也许这样的东西就足够了:
IDictionary<int, IList<Site>> result = await GetSource()
.GroupBy(o => o.SiteId)
.Select(async group => (group.Key, List: await group.ToList()))
.Merge()
.ToDictionary(group => group.Key, group => group.List);
假设网站实施如下:
public class Site
{
public int SiteId { get; set; }
//rest of class
}
看来您实际上并不需要 Dictionary
,而是 Lookup<TKey,TElement>
。
Represents a collection of keys each mapped to one or more values.
通过使用内置的 RX 运算符 ToLookup
创建 Lookup
是微不足道的。 Converting Lookup
到 Dictionary
的列表也很简单,通过使用标准的 LINQ 运算符 ToDictionary
。以下是将这两个运算符组合在一个扩展方法中:
public static async Task<Dictionary<TKey, List<TSource>>> ToDictionaryOfLists<TSource, TKey>(
IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
var lookup = await source.ToLookup(keySelector);
return lookup.ToDictionary(g => g.Key, g => g.ToList());
}
我有一个 IObservable
个对象,我想将其异步转换为列表字典。
这是我的代码,其中 GetSource returns 和 IObservable
:
await GetSource(...)
.GroupBy(o => o.SiteId)
.ToDictionary(g => g.Key, g => g.ToList())
当然是错的,因为结果是IDictionary<int, IObservable<IList<T>>>
,但是我需要一个IDictionary<int, IList<T>>
本质上,我需要 await
每个 IObservable<IList<T>>
,但我不知道如何优雅地做到这一点,我相信 Rx.NET 是可能的。
有什么想法吗?
请注意,无论您做什么,在源完成之前都无法完成字典。因此,一种简单的方法是像这样异步获取列表:
await GetSource(...).ToList();
然后像处理 GroupBy
和 ToDictionary
一样处理结果 IList<T>
- 请注意,您现在正在使用这些运算符的 IEnumerable<T>
实现完整列表。
此方法假定 GetSource()
可观察事件的数量和时间,并且组之间的分布使得在完成的流上计算组和字典的费用不会过高。由于最终您将返回一个完整的字典,无论如何我们都将保存在内存中,我们可能只考虑分组和创建字典条目的成本。与冗长的事件流的序列化传递相比,这在内存数据上的速度非常快,所以我倾向于认为这种方法在少数情况下并不完美。
如果它是禁止的,可能值得同时建立每个组和列表并且事件到达,在这种情况下你可以这样做:
await GetSource(...).GroupBy(x => x.SiteId)
.SelectMany(x => x.ToList())
.ToDictionary(x => x[0].SiteId);
请注意,ToDictionary
keySelector 中列表的第一个元素将 始终存在(否则不会具体化任何组),因此这是安全的。它看起来确实有点奇怪,但这是我能想到的最简单的取出钥匙的方法。
或作为通用函数:
async Task<IDictionary<TKey, IList<T>>> ToDictionaryOfLists<T, TKey>(
IObservable<T> source,
Func<T, TKey> keySelector)
{
return await source.GroupBy(keySelector)
.SelectMany(x => x.ToList())
.ToDictionary(x => keySelector(x[0]));
}
假设 class:
public class Site
{
public int SiteId { get; set; }
}
你可以这样使用:
var result = ToDictionaryOfLists(GetSource(...), x=> x.SiteId);
编辑: 我刚刚了解到 Task 返回 Select(...) + Merge() 可以替换为 SelectMany(...) 所以更好的方法是:
IDictionary<int, IList<Site>> result = await GetSource()
.GroupBy(o => o.SiteId)
.SelectMany(async group => (group.Key, List: await group.ToList()))
.ToDictionary(group => group.Key, group => group.List);
原文:
我刚开始学习 RX,但也许这样的东西就足够了:
IDictionary<int, IList<Site>> result = await GetSource()
.GroupBy(o => o.SiteId)
.Select(async group => (group.Key, List: await group.ToList()))
.Merge()
.ToDictionary(group => group.Key, group => group.List);
假设网站实施如下:
public class Site
{
public int SiteId { get; set; }
//rest of class
}
看来您实际上并不需要 Dictionary
,而是 Lookup<TKey,TElement>
。
Represents a collection of keys each mapped to one or more values.
通过使用内置的 RX 运算符 ToLookup
创建 Lookup
是微不足道的。 Converting Lookup
到 Dictionary
的列表也很简单,通过使用标准的 LINQ 运算符 ToDictionary
。以下是将这两个运算符组合在一个扩展方法中:
public static async Task<Dictionary<TKey, List<TSource>>> ToDictionaryOfLists<TSource, TKey>(
IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
var lookup = await source.ToLookup(keySelector);
return lookup.ToDictionary(g => g.Key, g => g.ToList());
}