Rx CatchAll 用于未观察到的消息

Rx CatchAll for unobserved messages

我想知道是否可以为 Rx IObservable 设置某种包罗万象的东西。

它的行为类似于:"if no other subscriber has observed this message, then do [something]"。

现在我连接了几个彼此不知道的 Observable 处理程序,并根据某些属性过滤事件。如果我们收到一条未被处理的消息,我想抛出一个错误,因为这将是一条无效消息。

我认为这是一个有趣的问题,所以我去写了一个中等规模的解决方案。我将解决方案分为三个部分:通用实现、示例用法和解释。

实施

public interface ITracked<out T>
{
    T Value { get; }
    bool IsObserved { get; }
    void Observe();
}

public class Tracked<T> : ITracked<T>
{
    private readonly T value;
    public Tracked(T value)
    {
        this.value = value;
    }
    public T Value
    {
        get { return value; }
    }
    public bool IsObserved { get; private set; }
    public void Observe()
    {
        IsObserved = true;
    }
}

public interface ITrackableObservable<out T> : IObservable<ITracked<T>>
{
    IObservable<T> Unobserved { get; }
}

public class TrackableObservable<T> : ITrackableObservable<T>
{
    private readonly ISubject<T> unobserved = new Subject<T>();
    private readonly IObservable<ITracked<T>> source;

    public TrackableObservable(IObservable<T> source)
    {
        this.source = Observable
            .Create<ITracked<T>>(observer => source.Subscribe(
                value =>
                {
                    var trackedValue = new Tracked<T>(value);
                    observer.OnNext(trackedValue);
                    if (!trackedValue.IsObserved)
                    {
                        unobserved.OnNext(value);
                    }
                },
                observer.OnError,
                observer.OnCompleted))
            .Publish()
            .RefCount();
    }

    public IObservable<T> Unobserved
    {
        get { return unobserved.AsObservable(); }
    }

    public IDisposable Subscribe(IObserver<ITracked<T>> observer)
    {
        return source.Subscribe(observer);
    }
}

public static class TrackableObservableExtensions
{
    public static ITrackableObservable<T> ToTrackableObservable<T>(this IObservable<T> source)
    {
        return new TrackableObservable<T>(source);
    }

    public static IObservable<T> Observe<T>(this IObservable<ITracked<T>> source)
    {
        return source.Do(x => x.Observe()).Select(x => x.Value);
    }

    public static IObservable<T> ObserveWhere<T>(this IObservable<ITracked<T>> source, Func<T, bool> predicate)
    {
        return source.Where(x => predicate(x.Value)).Observe();
    }
}

例子

public class Animal
{
    public int ID { get; set; }
    public string Kind { get; set; }
    public string Name { get; set; }
}

...

IObservable<Animal> animals = ...;
ITrackableObservable<Animal> trackableAnimals = animals.ToTrackableObservable();
trackableAnimals
    .ObserveWhere(a => a.Kind == "Cat")
    .Subscribe(a => Console.WriteLine("{0}: Meow", a.ID));
trackableAnimals
    .ObserveWhere(a => a.Kind == "Dog")
    .Subscribe(a => Console.WriteLine("{0}: Woof", a.ID));
trackableAnimals
    .ObserveWhere(a => a.Name != null)
    .Subscribe(a => Console.WriteLine("{0}: {1} named {2}", a.ID, a.Kind, a.Name));
trackableAnimals
    .Unobserved
    .Subscribe(a => Console.WriteLine("{0}: {1} with no name (unobserved)", a.ID, a.Kind));

如果 animals 发出这个序列:

new Animal { ID = 1, Kind = "Cat", Name = "Rusty" }
new Animal { ID = 2, Kind = "Horse" }
new Animal { ID = 3, Kind = "Dog", Name = "Fido" }
new Animal { ID = 4, Kind = "Dog" }
new Animal { ID = 5, Kind = "Bird", Name = "Simon" }

然后我们会看到这个输出:

1: Meow
1: Cat named Rusty
2: Horse with no name (unobserved)
3: Woof
3: Dog named Fido
4: Woof
5: Bird named Simon

说明

这里的想法是确保所有订阅者最终共享对源序列的单一订阅,并且每个值都有一个 bool 附加到它,说明该值是否已被观察到。每当源序列发出 T 时,我们将其包装为 ITracked<T>,然后将该单个实例传递给所有订阅者。然后观察者可以选择将值标记为观察到的值。一旦对观察者的所有 OnNext 调用都已返回,如果 ITracked<T> 未标记为已观察,那么我们知道它未被观察。

TrackableObservableExtensions class 提供了一些扩展方法,使实现的工作更加流畅,但它们不是实现的一部分所必需的。