Rx – 与超时不同?
Rx – Distinct with timeout?
我想知道是否有任何方法可以在 .NET 的响应式扩展中实现 Distinct,使其在给定时间内工作,并且在这段时间之后它应该重置并允许再次返回值。我需要这个用于应用程序中的热源,它将工作一整年,现在停止,所以我担心性能,我需要在一段时间后允许这些值。还有 DistinctUntilChanged,但在我的例子中,值可以混合使用——例如:A A X A,DistinctUntilChanged 会给我 A X A,我需要结果 A X,在给定时间后,distinct 应该被重置。
使用包装器 class 为项目添加时间戳,但不考虑用于散列或相等性的时间戳(created
字段):
public class DistinctForItem<T> : IEquatable<DistinctForItem<T>>
{
private readonly T item;
private DateTime created;
public DistinctForItem(T item)
{
this.item = item;
this.created = DateTime.UtcNow;
}
public T Item
{
get { return item; }
}
public DateTime Created
{
get { return created; }
}
public bool Equals(DistinctForItem<T> other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return EqualityComparer<T>.Default.Equals(Item, other.Item);
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((DistinctForItem<T>)obj);
}
public override int GetHashCode()
{
return EqualityComparer<T>.Default.GetHashCode(Item);
}
public static bool operator ==(DistinctForItem<T> left, DistinctForItem<T> right)
{
return Equals(left, right);
}
public static bool operator !=(DistinctForItem<T> left, DistinctForItem<T> right)
{
return !Equals(left, right);
}
}
现在可以编写 DistinctFor
扩展方法:
public static IObservable<T> DistinctFor<T>(this IObservable<T> src,
TimeSpan validityPeriod)
{
//if HashSet<DistinctForItem<T>> actually allowed us the get at the
//items it contains it would be a better choice.
//However it doesn't, so we resort to
//Dictionary<DistinctForItem<T>, DistinctForItem<T>> instead.
var hs = new Dictionary<DistinctForItem<T>, DistinctForItem<T>>();
return src.Select(item => new DistinctForItem<T>(item)).Where(df =>
{
DistinctForItem<T> hsVal;
if (hs.TryGetValue(df, out hsVal))
{
var age = DateTime.UtcNow - hsVal.Created;
if (age < validityPeriod)
{
return false;
}
}
hs[df] = df;
return true;
}).Select(df => df.Item);
}
可以使用的:
Enumerable.Range(0, 1000)
.Select(i => i % 3)
.ToObservable()
.Pace(TimeSpan.FromMilliseconds(500)) //drip feeds the observable
.DistinctFor(TimeSpan.FromSeconds(5))
.Subscribe(x => Console.WriteLine(x));
作为参考,这是我对 Pace<T>
:
的实现
public static IObservable<T> Pace<T>(this IObservable<T> src, TimeSpan delay)
{
var timer = Observable
.Timer(
TimeSpan.FromSeconds(0),
delay
);
return src.Zip(timer, (s, t) => s);
}
接受的答案有缺陷;缺陷如下所示。这是解决方案的演示,带有测试批次:
TestScheduler ts = new TestScheduler();
var source = ts.CreateHotObservable<char>(
new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(400.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('C')),
new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('B'))
);
var target = source.TimedDistinct(TimeSpan.FromMilliseconds(300), ts);
var expectedResults = ts.CreateHotObservable<char>(
new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('C')),
new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('B'))
);
var observer = ts.CreateObserver<char>();
target.Subscribe(observer);
ts.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
解决方案包括 TimedDistinct
的一些重载,允许 IScheduler
注入,以及 IEqualityComparer<T>
注入,类似于 Distinct
。忽略所有这些重载,解决方案依赖于辅助方法 StateWhere
,它有点像 Scan
和 Where
的组合:它像 Where
一样过滤,但允许您可以像 Scan
一样在其中嵌入状态。
public static class RxState
{
public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime)
{
return TimedDistinct(source, expirationTime, Scheduler.Default);
}
public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IScheduler scheduler)
{
return TimedDistinct<TSource>(source, expirationTime, EqualityComparer<TSource>.Default, scheduler);
}
public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IEqualityComparer<TSource> comparer)
{
return TimedDistinct(source, expirationTime, comparer, Scheduler.Default);
}
public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IEqualityComparer<TSource> comparer, IScheduler scheduler)
{
var toReturn = source
.Timestamp(scheduler)
.StateWhere(
new Dictionary<TSource, DateTimeOffset>(comparer),
(state, item) => item.Value,
(state, item) => state
.Where(kvp => item.Timestamp - kvp.Value < expirationTime)
.Concat(
!state.ContainsKey(item.Value) || item.Timestamp - state[item.Value] >= expirationTime
? Enumerable.Repeat(new KeyValuePair<TSource, DateTimeOffset>(item.Value, item.Timestamp), 1)
: Enumerable.Empty<KeyValuePair<TSource, DateTimeOffset>>()
)
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value, comparer),
(state, item) => !state.ContainsKey(item.Value) || item.Timestamp - state[item.Value] >= expirationTime
);
return toReturn;
}
public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>(
this IObservable<TSource> source,
TState initialState,
Func<TState, TSource, IObservable<TResult>> resultSelector,
Func<TState, TSource, TState> stateSelector
)
{
return source
.Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item)))
.SelectMany(t => t.Item2);
}
public static IObservable<TResult> StateWhere<TSource, TState, TResult>(
this IObservable<TSource> source,
TState initialState,
Func<TState, TSource, TResult> resultSelector,
Func<TState, TSource, TState> stateSelector,
Func<TState, TSource, bool> filter
)
{
return source
.StateSelectMany(initialState, (state, item) =>
filter(state, item) ? Observable.Return(resultSelector(state, item)) : Observable.Empty<TResult>(),
stateSelector);
}
}
接受的答案有两个缺陷:
- 它不接受
IScheduler
注入,这意味着很难在 Rx 测试框架内进行测试。这很容易修复。
- 它依赖于可变状态,这在像 Rx 这样的多线程框架中不能很好地工作。
问题 #2 对于多个订阅者很明显:
var observable = Observable.Range(0, 5)
.DistinctFor(TimeSpan.MaxValue)
;
observable.Subscribe(i => Console.WriteLine(i));
observable.Subscribe(i => Console.WriteLine(i));
按照常规的 Rx 行为,输出应该输出 0-4 两次。相反,0-4 只输出一次。
这是另一个示例缺陷:
var subject = new Subject<int>();
var observable = subject
.DistinctFor(TimeSpan.MaxValue);
observable.Subscribe(i => Console.WriteLine(i));
observable.Subscribe(i => Console.WriteLine(i));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
这会输出 1 2 3
一次,而不是两次。
这是 MsTicks
的代码:
public static class RxTestingHelpers
{
public static long MsTicks(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
我想知道是否有任何方法可以在 .NET 的响应式扩展中实现 Distinct,使其在给定时间内工作,并且在这段时间之后它应该重置并允许再次返回值。我需要这个用于应用程序中的热源,它将工作一整年,现在停止,所以我担心性能,我需要在一段时间后允许这些值。还有 DistinctUntilChanged,但在我的例子中,值可以混合使用——例如:A A X A,DistinctUntilChanged 会给我 A X A,我需要结果 A X,在给定时间后,distinct 应该被重置。
使用包装器 class 为项目添加时间戳,但不考虑用于散列或相等性的时间戳(created
字段):
public class DistinctForItem<T> : IEquatable<DistinctForItem<T>>
{
private readonly T item;
private DateTime created;
public DistinctForItem(T item)
{
this.item = item;
this.created = DateTime.UtcNow;
}
public T Item
{
get { return item; }
}
public DateTime Created
{
get { return created; }
}
public bool Equals(DistinctForItem<T> other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return EqualityComparer<T>.Default.Equals(Item, other.Item);
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != this.GetType()) return false;
return Equals((DistinctForItem<T>)obj);
}
public override int GetHashCode()
{
return EqualityComparer<T>.Default.GetHashCode(Item);
}
public static bool operator ==(DistinctForItem<T> left, DistinctForItem<T> right)
{
return Equals(left, right);
}
public static bool operator !=(DistinctForItem<T> left, DistinctForItem<T> right)
{
return !Equals(left, right);
}
}
现在可以编写 DistinctFor
扩展方法:
public static IObservable<T> DistinctFor<T>(this IObservable<T> src,
TimeSpan validityPeriod)
{
//if HashSet<DistinctForItem<T>> actually allowed us the get at the
//items it contains it would be a better choice.
//However it doesn't, so we resort to
//Dictionary<DistinctForItem<T>, DistinctForItem<T>> instead.
var hs = new Dictionary<DistinctForItem<T>, DistinctForItem<T>>();
return src.Select(item => new DistinctForItem<T>(item)).Where(df =>
{
DistinctForItem<T> hsVal;
if (hs.TryGetValue(df, out hsVal))
{
var age = DateTime.UtcNow - hsVal.Created;
if (age < validityPeriod)
{
return false;
}
}
hs[df] = df;
return true;
}).Select(df => df.Item);
}
可以使用的:
Enumerable.Range(0, 1000)
.Select(i => i % 3)
.ToObservable()
.Pace(TimeSpan.FromMilliseconds(500)) //drip feeds the observable
.DistinctFor(TimeSpan.FromSeconds(5))
.Subscribe(x => Console.WriteLine(x));
作为参考,这是我对 Pace<T>
:
public static IObservable<T> Pace<T>(this IObservable<T> src, TimeSpan delay)
{
var timer = Observable
.Timer(
TimeSpan.FromSeconds(0),
delay
);
return src.Zip(timer, (s, t) => s);
}
接受的答案有缺陷;缺陷如下所示。这是解决方案的演示,带有测试批次:
TestScheduler ts = new TestScheduler();
var source = ts.CreateHotObservable<char>(
new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(400.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('C')),
new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('B'))
);
var target = source.TimedDistinct(TimeSpan.FromMilliseconds(300), ts);
var expectedResults = ts.CreateHotObservable<char>(
new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('C')),
new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('B'))
);
var observer = ts.CreateObserver<char>();
target.Subscribe(observer);
ts.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
解决方案包括 TimedDistinct
的一些重载,允许 IScheduler
注入,以及 IEqualityComparer<T>
注入,类似于 Distinct
。忽略所有这些重载,解决方案依赖于辅助方法 StateWhere
,它有点像 Scan
和 Where
的组合:它像 Where
一样过滤,但允许您可以像 Scan
一样在其中嵌入状态。
public static class RxState
{
public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime)
{
return TimedDistinct(source, expirationTime, Scheduler.Default);
}
public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IScheduler scheduler)
{
return TimedDistinct<TSource>(source, expirationTime, EqualityComparer<TSource>.Default, scheduler);
}
public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IEqualityComparer<TSource> comparer)
{
return TimedDistinct(source, expirationTime, comparer, Scheduler.Default);
}
public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IEqualityComparer<TSource> comparer, IScheduler scheduler)
{
var toReturn = source
.Timestamp(scheduler)
.StateWhere(
new Dictionary<TSource, DateTimeOffset>(comparer),
(state, item) => item.Value,
(state, item) => state
.Where(kvp => item.Timestamp - kvp.Value < expirationTime)
.Concat(
!state.ContainsKey(item.Value) || item.Timestamp - state[item.Value] >= expirationTime
? Enumerable.Repeat(new KeyValuePair<TSource, DateTimeOffset>(item.Value, item.Timestamp), 1)
: Enumerable.Empty<KeyValuePair<TSource, DateTimeOffset>>()
)
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value, comparer),
(state, item) => !state.ContainsKey(item.Value) || item.Timestamp - state[item.Value] >= expirationTime
);
return toReturn;
}
public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>(
this IObservable<TSource> source,
TState initialState,
Func<TState, TSource, IObservable<TResult>> resultSelector,
Func<TState, TSource, TState> stateSelector
)
{
return source
.Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item)))
.SelectMany(t => t.Item2);
}
public static IObservable<TResult> StateWhere<TSource, TState, TResult>(
this IObservable<TSource> source,
TState initialState,
Func<TState, TSource, TResult> resultSelector,
Func<TState, TSource, TState> stateSelector,
Func<TState, TSource, bool> filter
)
{
return source
.StateSelectMany(initialState, (state, item) =>
filter(state, item) ? Observable.Return(resultSelector(state, item)) : Observable.Empty<TResult>(),
stateSelector);
}
}
接受的答案有两个缺陷:
- 它不接受
IScheduler
注入,这意味着很难在 Rx 测试框架内进行测试。这很容易修复。 - 它依赖于可变状态,这在像 Rx 这样的多线程框架中不能很好地工作。
问题 #2 对于多个订阅者很明显:
var observable = Observable.Range(0, 5)
.DistinctFor(TimeSpan.MaxValue)
;
observable.Subscribe(i => Console.WriteLine(i));
observable.Subscribe(i => Console.WriteLine(i));
按照常规的 Rx 行为,输出应该输出 0-4 两次。相反,0-4 只输出一次。
这是另一个示例缺陷:
var subject = new Subject<int>();
var observable = subject
.DistinctFor(TimeSpan.MaxValue);
observable.Subscribe(i => Console.WriteLine(i));
observable.Subscribe(i => Console.WriteLine(i));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
这会输出 1 2 3
一次,而不是两次。
这是 MsTicks
的代码:
public static class RxTestingHelpers
{
public static long MsTicks(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}