应该如何在 Rx 中实现 DistinctLatest(和缓存)运算符?
How should one go about implementing a DistinctLatest (and caching) operator in Rx?
我有一个问题 ,社区很好地解决了这个问题。提出了一个问题,即缓存和替换值的实际目标(如上述问题中定义的)可以用 .DistinctLatest
运算符定义。
好的!似乎没有太多人谈论这样的运营商。一边搜索,一边思考,我发现 ,有点接近了。为了模仿原始问题,我尝试将缓存运算符写为
/// <summary>
/// A cache that keeps distinct elements where the elements are replaced by the latest.
/// </summary>
/// <typeparam name="T">The type of the result</typeparam>
/// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
/// <param name="newElements">The sequence of new elements.</param>
/// <param name="seedElements">The seed elements when the cache is started.</param>
/// <param name="replacementSelector">The replacement selector to choose distinct elements in the cache.</param>
/// <returns>The cache contents upon first call and changes thereafter.</returns>
public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
var s = newElements.StartWith(seedElements).GroupBy(replacementSelector).Select(groupObservable =>
{
var replaySubject = new ReplaySubject<T>(1);
groupObservable.Subscribe(value => replaySubject.OnNext(value));
return replaySubject;
});
return s.SelectMany(i => i);
}
但是进行测试似乎也无济于事。看起来如果有人在一开始就订阅了初始值和更新(以及新值)。而如果最后订阅,则只记录替换的种子值。
现在,我想知道一个通用的 DistinctLast
运算符,我认为是这样,但它不起作用,然后这个 "cache" 添加的是种子值和组的扁平化,但这不是测试所告诉的。我也尝试了一些分组和 .TakeLast()
的东西,但没有骰子。
如果有人对此有指点或思考,我会很高兴,希望这会成为普遍有益的事情。
@LeeCampbell 为此做了大部分工作。请参阅其他参考问题。无论如何,这是代码:
public static class RxExtensions
{
public static IObservable<T> DistinctLatest<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
return seedElements.ToObservable()
.Concat(newElements)
.GroupBy(i => replacementSelector)
.SelectMany(grp => grp.Replay(1).Publish().RefCount());
}
}
我有一个问题 .DistinctLatest
运算符定义。
好的!似乎没有太多人谈论这样的运营商。一边搜索,一边思考,我发现
/// <summary>
/// A cache that keeps distinct elements where the elements are replaced by the latest.
/// </summary>
/// <typeparam name="T">The type of the result</typeparam>
/// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
/// <param name="newElements">The sequence of new elements.</param>
/// <param name="seedElements">The seed elements when the cache is started.</param>
/// <param name="replacementSelector">The replacement selector to choose distinct elements in the cache.</param>
/// <returns>The cache contents upon first call and changes thereafter.</returns>
public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
var s = newElements.StartWith(seedElements).GroupBy(replacementSelector).Select(groupObservable =>
{
var replaySubject = new ReplaySubject<T>(1);
groupObservable.Subscribe(value => replaySubject.OnNext(value));
return replaySubject;
});
return s.SelectMany(i => i);
}
但是进行测试似乎也无济于事。看起来如果有人在一开始就订阅了初始值和更新(以及新值)。而如果最后订阅,则只记录替换的种子值。
现在,我想知道一个通用的 DistinctLast
运算符,我认为是这样,但它不起作用,然后这个 "cache" 添加的是种子值和组的扁平化,但这不是测试所告诉的。我也尝试了一些分组和 .TakeLast()
的东西,但没有骰子。
如果有人对此有指点或思考,我会很高兴,希望这会成为普遍有益的事情。
@LeeCampbell 为此做了大部分工作。请参阅其他参考问题。无论如何,这是代码:
public static class RxExtensions
{
public static IObservable<T> DistinctLatest<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
return seedElements.ToObservable()
.Concat(newElements)
.GroupBy(i => replacementSelector)
.SelectMany(grp => grp.Replay(1).Publish().RefCount());
}
}