观察其他观察者看不到的值

Observe values not seen in other observers

我有一个发出唯一值的可观察对象,例如

var source=Observable.Range(1,100).Publish();
source.Connect();

我想观察它的值,例如两个观察者,但每个观察者只收到其他观察者看不到的值的通知。

因此,如果第一个观察者包含值 10,则第二个观察者永远不会收到关于值 10 的通知。

更新

我选择了@Asti 的答案,因为它是第一个,虽然它指出了正确的方向并支持@Shlomo 的答案。太糟糕了,我不能同时接受这两个答案,因为@Shlomo 的答案更正确,我非常感谢他为我们在这个标签上提供的所有帮助。

Observables 不应该对不同的观察者有不同的行为;更好的方法是为每个观察者提供自己的过滤观察值。

也就是说,如果您的约束要求您需要在单个可观察对象中执行此行为 - 我们可以使用循环法。

    public static IEnumerable<T> Repeat<T>(this IEnumerable<T> source)
    {
        for (; ; )
            foreach (var item in source.ToArray())
                yield return item;
    }

    public static IObservable<T> RoundRobin<T>(this IObservable<T> source)
    {
        var subscribers = new List<IObserver<T>>();
        var shared = source
            .Zip(subscribers.Repeat(), (value, observer) => (value, observer))
            .Publish()
            .RefCount();

        return Observable.Create<T>(observer =>
        {
            subscribers.Add(observer);
            var subscription = 
                shared
                .Where(pair => pair.observer == observer)
                .Select(pair => pair.value)
                .Subscribe(observer);

            var dispose = Disposable.Create(() => subscribers.Remove(observer));
            return new CompositeDisposable(subscription, dispose);
        });
    }

用法:

var source = Observable.Range(1, 100).Publish();
var dist = source.RoundRobin();
dist.Subscribe(i => Console.WriteLine($"One sees {i}"));
dist.Subscribe(i => Console.WriteLine($"Two sees {i}"));

source.Connect();

结果:

One sees 1
Two sees 2
One sees 3
Two sees 4
One sees 5
Two sees 6
One sees 7
Two sees 8
One sees 9
Two sees 10

如果你已经有了一个观察者列表,代码就会简单很多。

编辑:@Asti 修复了他的错误,我根据他的回答修复了我的错误。我们的答案现在大体相似。我有一个想法如何做一个纯粹的反应式,如果我有时间我会 post 稍后。

固定码:

public static IObservable<T> RoundRobin2<T>(this IObservable<T> source)
{
    var subscribers = new BehaviorSubject<ImmutableList<IObserver<T>>>(ImmutableList<IObserver<T>>.Empty);
    ImmutableList<IObserver<T>> latest = ImmutableList<IObserver<T>>.Empty;
    subscribers.Subscribe(l => latest = l);

    var shared = source
            .Select((v, i) => (v, i))
            .WithLatestFrom(subscribers, (t, s) => (t.v, t.i, s))
            .Publish()
            .RefCount();
    return Observable.Create<T>(observer =>
    {
        subscribers.OnNext(latest.Add(observer));
        var dispose = Disposable.Create(() => subscribers.OnNext(latest.Remove(observer)));

        var sub = shared
            .Where(t => t.i % t.s.Count == t.s.FindIndex(o => o == observer))
            .Select(t => t.v)
            .Subscribe(observer);

        return new CompositeDisposable(dispose, sub);
    });
}

原回答: 我赞成@Asti 的回答,因为他基本上是正确的:仅仅因为你可以,并不意味着你应该。他的回答基本上有效,但存在错误:

这很好用:

var source = Observable.Range(1, 20).Publish();
var dist = source.RoundRobin();
dist.Subscribe(i => Console.WriteLine($"One sees {i}"));
dist.Take(1).Subscribe(i => Console.WriteLine($"Two sees {i}"));

这不是:

var source = Observable.Range(1, 20).Publish();
var dist = source.RoundRobin();
dist.Take(1).Subscribe(i => Console.WriteLine($"One sees {i}"));
dist.Subscribe(i => Console.WriteLine($"Two sees {i}"));

输出为:

One sees 1
Two sees 1
Two sees 2
Two sees 3
Two sees 4
...

我最初以为是 Halloween related 的错误,但现在我不确定了。 Repeat 中的 .ToArray() 应该解决这个问题。我还编写了一个具有相同错误的纯 ish 可观察实现。此实现不能保证完美的循环,但这不是问题所在:

public static IObservable<T> RoundRobin2<T>(this IObservable<T> source)
{
    var subscribers = new BehaviorSubject<ImmutableList<IObserver<T>>>(ImmutableList<IObserver<T>>.Empty);
    ImmutableList<IObserver<T>> latest = ImmutableList<IObserver<T>>.Empty;
    subscribers.Subscribe(l => latest = l);

    var shared = source
            .Select((v, i) => (v, i))
            .WithLatestFrom(subscribers, (t, s) => (t.v, t.i, s))
            .Publish()
            .RefCount();
    return Observable.Create<T>(observer =>
    {
        subscribers.OnNext(latest.Add(observer));
        var dispose = Disposable.Create(() => subscribers.OnNext(latest.Remove(observer)));

        var sub = shared
            .Where(t => t.i % t.s.Count == t.s.FindIndex(o => o == observer))
            .Select(t => t.v)
            .Subscribe(observer);

        return new CompositeDisposable(dispose, sub);
    });
}

这是一个使用 TPL 数据流的简单分布式队列实现。但是对于不同的观察者没有看到相同的价值,它表现不正确的可能性很小。它不是循环法,但实际上具有背压语义。

    public static IObservable<T> Distribute<T>(this IObservable<T> source)
    {
        var buffer = new BufferBlock<T>();
        source.Subscribe(buffer.AsObserver());             
        return Observable.Create<T>(observer =>
            buffer.LinkTo(new ActionBlock<T>(observer.OnNext, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 })
        );
    }

输出

One sees 1
Two sees 2
One sees 3
Two sees 4
One sees 5
One sees 6
One sees 7
One sees 8
One sees 9
One sees 10

我可能更喜欢完全跳过 Rx,只使用 TPL Dataflow。