缓存服务更新和新值 "DistinctLatest" 以及订阅时的完整缓存内容

A cache serving updates and new values as "DistinctLatest" and full cache contents upon subscription

我正在尝试使用 ReplaySubject 实现缓存,如下所示,但我无法使用 Rx 解决这个问题。请参阅代码和随附的测试。问题是缓存会丢弃最新的条目并保留最旧的条目。

public static class RxExtensions
{
    /// <summary>
    /// A cache that keeps distinct elements where the elements are replaced by the latest. Upon subscription the subscriber should receive the full cache contents.
    /// </summary>
    /// <typeparam name="T">The type of the result</typeparam>
    /// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
    /// <param name="newElements">The sequence of new elements.</param>
    /// <param name="seedElements">The elements when the cache is started.</param>
    /// <param name="replacementSelector">The replacement to select distinct elements in the cache.</param>
    /// <returns>The cache contents upon first call and changes thereafter.</returns>
    public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
    {
        var replaySubject = new ReplaySubject<T>();
        seedElements.ToObservable().Concat(newElements).Subscribe(replaySubject);

        return replaySubject.Distinct(replacementSelector);
    }
}

看起来旧的,种子值,如果我写这样的函数会被删除

newElements.Subscribe(replaySubject);
return replaySubject.Concat(seedElements.ToObservable()).Distinct(replacementSelector);

但由于我认为 .Concat 的工作方式,"works" 可能只是因为目前的测试情况,请参阅下一篇。

public void CacheTests()
{
    var seedElements = new List<Event>(new[]
    {
        new Event { Id = 0, Batch = 1 },
        new Event { Id = 1, Batch = 1 },
        new Event { Id = 2, Batch = 1 }
    });

    var testScheduler = new TestScheduler();
    var observer = testScheduler.CreateObserver<Event>();
    var batchTicks = TimeSpan.FromSeconds(10);
    var xs = testScheduler.CreateHotObservable
    (
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 0, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 1, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 2, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 3, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks, new Event { Id = 4, Batch = 2 }),
        ReactiveTest.OnNext(batchTicks.Ticks + 10, new Event { Id = 0, Batch = 3 }),
        ReactiveTest.OnNext(batchTicks.Ticks + 10, new Event { Id = 1, Batch = 3 })
    );

    var subs = xs.Cache(seedElements, i => i.Id).Subscribe(observer);
    var seedElementsAndNoMore = observer.Messages.ToArray();
    Assert.IsTrue(observer.Messages.Count == 3);

    testScheduler.Start();
    var seedAndReplacedElements = observer.Messages.ToArray();

    //OK, a bad assert, we should create expected timings and want to check
    //also the actual batch numbers, but to get things going...
    //There should be Events with IDs { 1, 2, 3, 4, 5 } all having a batch number
    //of either 2 or 3. Also, a total of 7 (not 10) events
    //should've been observed.
    Assert.IsTrue(observer.Messages.Count == 7);
    for(int i = 0; i < seedAndReplacedElements.Length; ++i)
    {                
        Assert.IsTrue(seedAndReplacedElements[i].Value.Value.Batch > 1)             
    }
}

我想我想要的是

public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
    var replaySubject = new ReplaySubject<T>();
    newElements.StartWith(seedElements).Distinct(replacementSelector).Subscribe(replaySubject);

    return replaySubject;           
}

但问题在于种子值先存在,然后 Rx 丢弃较新的值,而不是种子值。然后反其道而行之(可能使用 .Merge)可能会造成一种情况,即在收到新值后将种子引入可观察对象,从而造成种子值实际上并未被替换的情况。

这不是答案,而是对您问题的澄清。

我很难理解用例。正如@ibebbs 指出的那样, Distinct 不是那样工作的。看起来你想要像 DistinctLatest.

这样的东西

这是您测试的弹珠图。 '|'在此图中表示订阅,而不是完成。此外,假设 new 是一个热可观察对象,s1 是大约 t=20 的订阅者,而 s2 是大约 t=1:[=19= 的订阅者]

   t: ------------0--------------10--------------------20------
seed: (10)(11)(12)---------------------------------------------
 new: ---------------------------(20)(21)(22)(23)(24)--(30)(31)
  s1:                                                  |(30)(31)(22)(23)(24)
  s2:              |(10)(11)(12)-(20)(21)(22)(23)(24)--(30)(31)

这是你想要的吗?


编辑:

来自@LeeCampbell 评论的回答:

public static class RxExtensions
{
    public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
    {
        return seedElements.ToObservable()
            .Concat(newElements)
            .GroupBy(i => replacementSelector)
            .SelectMany(grp => grp.Replay(1).Publish().RefCoun‌​t());
    }
}

好的,我想我有你想要的。我主要根据以下短语确定您的要求:

When the subscriber subscribes to this cache, it gets all the values held in the cache as the first thing and then after that updates as they come in

我相信这是希望在单个订阅之外有一个生命周期(即它应该被启动并且订阅者可以随意来去)因此使它成为一个 IConnectableObservable(这在你的代码中是隐含的但是范围不正确)。

我还重构了您的测试以显示多个订阅者(根据@Shlomo 的评论),如下所示:

[Fact]
public void ReplayAllElements()
{
    var seedElements = new List<Event>(new[]
    {
        new Event { Id = 0, Batch = 1 },
        new Event { Id = 1, Batch = 1 },
        new Event { Id = 2, Batch = 1 }
    });

    var testScheduler = new TestScheduler();

    var xs = testScheduler.CreateHotObservable
    (
        ReactiveTest.OnNext(1, new Event { Id = 0, Batch = 2 }),
        ReactiveTest.OnNext(2, new Event { Id = 1, Batch = 2 }),
        ReactiveTest.OnNext(3, new Event { Id = 2, Batch = 2 }),
        ReactiveTest.OnNext(4, new Event { Id = 3, Batch = 2 }),
        ReactiveTest.OnNext(5, new Event { Id = 4, Batch = 2 }),    
        ReactiveTest.OnNext(6, new Event { Id = 0, Batch = 3 }),
        ReactiveTest.OnNext(7, new Event { Id = 1, Batch = 3 })
    );

    IConnectableObservable<Event> cached = xs.Cache(seedElements, i => i.Id);

    var observerA = testScheduler.CreateObserver<Event>();
    cached.Subscribe(observerA);
    cached.Connect();

    testScheduler.AdvanceTo(4);

    var observerB = testScheduler.CreateObserver<Event>();
    cached.Subscribe(observerB);

    testScheduler.AdvanceTo(7);

    var expectedA = new[]
    {
        ReactiveTest.OnNext<Event>(0, @event => @event.Id == 0 && @event.Batch == 1 ),
        ReactiveTest.OnNext<Event>(0, @event => @event.Id == 1 && @event.Batch == 1 ),
        ReactiveTest.OnNext<Event>(0, @event => @event.Id == 2 && @event.Batch == 1 ),
        ReactiveTest.OnNext<Event>(1, @event => @event.Id == 0 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(2, @event => @event.Id == 1 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(3, @event => @event.Id == 2 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(4, @event => @event.Id == 3 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 4 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(6, @event => @event.Id == 0 && @event.Batch == 3 ),
        ReactiveTest.OnNext<Event>(7, @event => @event.Id == 1 && @event.Batch == 3 )
    };

    observerA.Messages.AssertEqual(expectedA);

    var expectedB = new[]
    {
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 0 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 1 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 2 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 3 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(5, @event => @event.Id == 4 && @event.Batch == 2 ),
        ReactiveTest.OnNext<Event>(6, @event => @event.Id == 0 && @event.Batch == 3 ),
        ReactiveTest.OnNext<Event>(7, @event => @event.Id == 1 && @event.Batch == 3 )
    };

    observerB.Messages.AssertEqual(expectedB);
}

如您所见,observerA 获取所有种子值和更新,而 observerB 仅获取每个键的最新值,然后进一步更新。

执行此操作的代码如下:

public static class RxExtensions
{
    /// <summary>
    /// A cache that keeps distinct elements where the elements are replaced by the latest.
    /// </summary>
    /// <typeparam name="T">The type of the result</typeparam>
    /// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
    /// <param name="newElements">The sequence of new elements.</param>
    /// <param name="seedElements">The elements when the cache is started.</param>
    /// <param name="keySelector">The replacement to select distinct elements in the cache.</param>
    /// <returns>The cache contents upon first call and changes thereafter.</returns>
    public static IConnectableObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> keySelector)
    {
        return new Cache<TKey, T>(newElements, seedElements, keySelector);
    }
}

public class Cache<TKey, T> : IConnectableObservable<T>
{
    private class State
    {
        public ImmutableDictionary<TKey, T> Cache { get; set; }
        public T Value { get; set; }
    }

    private readonly IConnectableObservable<State> _source;
    private readonly IObservable<T> _observable;

    public Cache(IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> keySelector)
    {
        var agg = new State { Cache = seedElements.ToImmutableDictionary(keySelector), Value = default(T) };

        _source = newElements
            // Use the Scan operator to update the dictionary of values based on key and use the anonymous tuple to pass this and the current item to the next operator
            .Scan(agg, (tuple, item) => new State { Cache = tuple.Cache.SetItem(keySelector(item), item), Value = item })
            // Ensure we always have at least one item
            .StartWith(agg)
            // Share this single subscription to the above with all subscribers
            .Publish();

        _observable = _source.Publish(source =>
                // ... concatting ...
                Observable.Concat(
                    // ... getting a single collection of values from the cache and flattening it to a series of values ...
                    source.Select(tuple => tuple.Cache.Values).Take(1).SelectMany(values => values),
                    // ... and the returning the values as they're emitted from the source
                    source.Select(tuple => tuple.Value)
                )
            );
    }

    public IDisposable Connect()
    {
        return _source.Connect();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _observable.Subscribe(observer);
    }
}

当然是一个有趣的问题。答案的关键是这个 Publish 超载:

    // Summary:
    //     Returns an observable sequence that is the result of invoking the selector on
    //     a connectable observable sequence that shares a single subscription to the underlying
    //     sequence. This operator is a specialization of Multicast using a regular System.Reactive.Subjects.Subject`1.
    //
    // Parameters:
    //   source:
    //     Source sequence whose elements will be multicasted through a single shared subscription.
    //
    //   selector:
    //     Selector function which can use the multicasted source sequence as many times
    //     as needed, without causing multiple subscriptions to the source sequence. Subscribers
    //     to the given source will receive all notifications of the source from the time
    //     of the subscription on.
    //
    // Type parameters:
    //   TSource:
    //     The type of the elements in the source sequence.
    //
    //   TResult:
    //     The type of the elements in the result sequence.
    //
    // Returns:
    //     An observable sequence that contains the elements of a sequence produced by multicasting
    //     the source sequence within a selector function.
    //
    // Exceptions:
    //   T:System.ArgumentNullException:
    //     source or selector is null.
    public static IObservable<TResult> Publish<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);

无论如何,希望对您有所帮助。