Rx CatchAll 用于未观察到的消息
Rx CatchAll for unobserved messages
我想知道是否可以为 Rx IObservable
设置某种包罗万象的东西。
它的行为类似于:"if no other subscriber has observed this message, then do [something]"。
现在我连接了几个彼此不知道的 Observable 处理程序,并根据某些属性过滤事件。如果我们收到一条未被处理的消息,我想抛出一个错误,因为这将是一条无效消息。
我认为这是一个有趣的问题,所以我去写了一个中等规模的解决方案。我将解决方案分为三个部分:通用实现、示例用法和解释。
实施
public interface ITracked<out T>
{
T Value { get; }
bool IsObserved { get; }
void Observe();
}
public class Tracked<T> : ITracked<T>
{
private readonly T value;
public Tracked(T value)
{
this.value = value;
}
public T Value
{
get { return value; }
}
public bool IsObserved { get; private set; }
public void Observe()
{
IsObserved = true;
}
}
public interface ITrackableObservable<out T> : IObservable<ITracked<T>>
{
IObservable<T> Unobserved { get; }
}
public class TrackableObservable<T> : ITrackableObservable<T>
{
private readonly ISubject<T> unobserved = new Subject<T>();
private readonly IObservable<ITracked<T>> source;
public TrackableObservable(IObservable<T> source)
{
this.source = Observable
.Create<ITracked<T>>(observer => source.Subscribe(
value =>
{
var trackedValue = new Tracked<T>(value);
observer.OnNext(trackedValue);
if (!trackedValue.IsObserved)
{
unobserved.OnNext(value);
}
},
observer.OnError,
observer.OnCompleted))
.Publish()
.RefCount();
}
public IObservable<T> Unobserved
{
get { return unobserved.AsObservable(); }
}
public IDisposable Subscribe(IObserver<ITracked<T>> observer)
{
return source.Subscribe(observer);
}
}
public static class TrackableObservableExtensions
{
public static ITrackableObservable<T> ToTrackableObservable<T>(this IObservable<T> source)
{
return new TrackableObservable<T>(source);
}
public static IObservable<T> Observe<T>(this IObservable<ITracked<T>> source)
{
return source.Do(x => x.Observe()).Select(x => x.Value);
}
public static IObservable<T> ObserveWhere<T>(this IObservable<ITracked<T>> source, Func<T, bool> predicate)
{
return source.Where(x => predicate(x.Value)).Observe();
}
}
例子
public class Animal
{
public int ID { get; set; }
public string Kind { get; set; }
public string Name { get; set; }
}
...
IObservable<Animal> animals = ...;
ITrackableObservable<Animal> trackableAnimals = animals.ToTrackableObservable();
trackableAnimals
.ObserveWhere(a => a.Kind == "Cat")
.Subscribe(a => Console.WriteLine("{0}: Meow", a.ID));
trackableAnimals
.ObserveWhere(a => a.Kind == "Dog")
.Subscribe(a => Console.WriteLine("{0}: Woof", a.ID));
trackableAnimals
.ObserveWhere(a => a.Name != null)
.Subscribe(a => Console.WriteLine("{0}: {1} named {2}", a.ID, a.Kind, a.Name));
trackableAnimals
.Unobserved
.Subscribe(a => Console.WriteLine("{0}: {1} with no name (unobserved)", a.ID, a.Kind));
如果 animals
发出这个序列:
new Animal { ID = 1, Kind = "Cat", Name = "Rusty" }
new Animal { ID = 2, Kind = "Horse" }
new Animal { ID = 3, Kind = "Dog", Name = "Fido" }
new Animal { ID = 4, Kind = "Dog" }
new Animal { ID = 5, Kind = "Bird", Name = "Simon" }
然后我们会看到这个输出:
1: Meow
1: Cat named Rusty
2: Horse with no name (unobserved)
3: Woof
3: Dog named Fido
4: Woof
5: Bird named Simon
说明
这里的想法是确保所有订阅者最终共享对源序列的单一订阅,并且每个值都有一个 bool
附加到它,说明该值是否已被观察到。每当源序列发出 T
时,我们将其包装为 ITracked<T>
,然后将该单个实例传递给所有订阅者。然后观察者可以选择将值标记为观察到的值。一旦对观察者的所有 OnNext
调用都已返回,如果 ITracked<T>
未标记为已观察,那么我们知道它未被观察。
TrackableObservableExtensions
class 提供了一些扩展方法,使实现的工作更加流畅,但它们不是实现的一部分所必需的。
我想知道是否可以为 Rx IObservable
设置某种包罗万象的东西。
它的行为类似于:"if no other subscriber has observed this message, then do [something]"。
现在我连接了几个彼此不知道的 Observable 处理程序,并根据某些属性过滤事件。如果我们收到一条未被处理的消息,我想抛出一个错误,因为这将是一条无效消息。
我认为这是一个有趣的问题,所以我去写了一个中等规模的解决方案。我将解决方案分为三个部分:通用实现、示例用法和解释。
实施
public interface ITracked<out T>
{
T Value { get; }
bool IsObserved { get; }
void Observe();
}
public class Tracked<T> : ITracked<T>
{
private readonly T value;
public Tracked(T value)
{
this.value = value;
}
public T Value
{
get { return value; }
}
public bool IsObserved { get; private set; }
public void Observe()
{
IsObserved = true;
}
}
public interface ITrackableObservable<out T> : IObservable<ITracked<T>>
{
IObservable<T> Unobserved { get; }
}
public class TrackableObservable<T> : ITrackableObservable<T>
{
private readonly ISubject<T> unobserved = new Subject<T>();
private readonly IObservable<ITracked<T>> source;
public TrackableObservable(IObservable<T> source)
{
this.source = Observable
.Create<ITracked<T>>(observer => source.Subscribe(
value =>
{
var trackedValue = new Tracked<T>(value);
observer.OnNext(trackedValue);
if (!trackedValue.IsObserved)
{
unobserved.OnNext(value);
}
},
observer.OnError,
observer.OnCompleted))
.Publish()
.RefCount();
}
public IObservable<T> Unobserved
{
get { return unobserved.AsObservable(); }
}
public IDisposable Subscribe(IObserver<ITracked<T>> observer)
{
return source.Subscribe(observer);
}
}
public static class TrackableObservableExtensions
{
public static ITrackableObservable<T> ToTrackableObservable<T>(this IObservable<T> source)
{
return new TrackableObservable<T>(source);
}
public static IObservable<T> Observe<T>(this IObservable<ITracked<T>> source)
{
return source.Do(x => x.Observe()).Select(x => x.Value);
}
public static IObservable<T> ObserveWhere<T>(this IObservable<ITracked<T>> source, Func<T, bool> predicate)
{
return source.Where(x => predicate(x.Value)).Observe();
}
}
例子
public class Animal
{
public int ID { get; set; }
public string Kind { get; set; }
public string Name { get; set; }
}
...
IObservable<Animal> animals = ...;
ITrackableObservable<Animal> trackableAnimals = animals.ToTrackableObservable();
trackableAnimals
.ObserveWhere(a => a.Kind == "Cat")
.Subscribe(a => Console.WriteLine("{0}: Meow", a.ID));
trackableAnimals
.ObserveWhere(a => a.Kind == "Dog")
.Subscribe(a => Console.WriteLine("{0}: Woof", a.ID));
trackableAnimals
.ObserveWhere(a => a.Name != null)
.Subscribe(a => Console.WriteLine("{0}: {1} named {2}", a.ID, a.Kind, a.Name));
trackableAnimals
.Unobserved
.Subscribe(a => Console.WriteLine("{0}: {1} with no name (unobserved)", a.ID, a.Kind));
如果 animals
发出这个序列:
new Animal { ID = 1, Kind = "Cat", Name = "Rusty" }
new Animal { ID = 2, Kind = "Horse" }
new Animal { ID = 3, Kind = "Dog", Name = "Fido" }
new Animal { ID = 4, Kind = "Dog" }
new Animal { ID = 5, Kind = "Bird", Name = "Simon" }
然后我们会看到这个输出:
1: Meow
1: Cat named Rusty
2: Horse with no name (unobserved)
3: Woof
3: Dog named Fido
4: Woof
5: Bird named Simon
说明
这里的想法是确保所有订阅者最终共享对源序列的单一订阅,并且每个值都有一个 bool
附加到它,说明该值是否已被观察到。每当源序列发出 T
时,我们将其包装为 ITracked<T>
,然后将该单个实例传递给所有订阅者。然后观察者可以选择将值标记为观察到的值。一旦对观察者的所有 OnNext
调用都已返回,如果 ITracked<T>
未标记为已观察,那么我们知道它未被观察。
TrackableObservableExtensions
class 提供了一些扩展方法,使实现的工作更加流畅,但它们不是实现的一部分所必需的。