测试 Rx 中的延迟
Testing delays in Rx
我正在尝试弄清楚如何测试以下函数,该函数在 Observable.ObserveOn
.
的内部队列周围添加了监控
public IObservable<T> MonitorBuffer<T>(IObservable<T> source, Action<int> monitor, TimeSpan interval, IScheduler scheduler)
{
return Observable.Create<T>(ob =>
{
int count = 0;
return new CompositeDisposable(source
.Do(_ => Interlocked.Increment(ref count))
.ObserveOn(scheduler)
.Do(_ => Interlocked.Decrement(ref count))
.Subscribe(ob),
Observable.Interval(interval, scheduler).Select(_ => count).DistinctUntilChanged().Subscribe(monitor)
);
});
}
我设想的是这样的:
var ts = new TestScheduler();
var input = Enumerable.Range(1, 8).Select(i => OnNext(i * 10, i)).ToArray();
var hot = ts.CreateHotObservable(input);
var observer = ts.CreateObserver<int>();
var log = new Subject<int>();
var monitor = ts.CreateObserver<int>();
var ticks = TimeSpan.FromTicks(5);
var buffered = MonitorBuffer(hot, log.OnNext, ticks, ts);
log.Subscribe(monitor);
buffered.Do(x => { /*if(x == 3) Introduce delay here */}).Subscribe(observer);
ts.AdvanceTo(100);
observer.Messages.AssertEqual(...);
monitor.Messages.AssertEqual(...);
问题是,我可以在 Do 中输入什么来获得临时下游延迟的预期效果。
我正在寻找类似这样的结果:
//time: 0--------10--------20--------30--------40--------50--------60--------70--------
//source: ---------1---------2---------3---------4---------5---------6---------7---------
//output: ---------1---------2-----------------------------345-------6---------7---------
//log: ----0-------------------------1---------2---------2----0-----------------------
(注意:前段时间问了一个similar question,但不是很清楚,现在完全重写有点晚了)。
我想我已经搞定了...
秘密是有两个可以独立推进的调度器。
基于问题中的测试代码:
var inputscheduler = new TestScheduler();
(...)
//different scheduler for buffer/observeOn
var bufferScheduler = new TestScheduler();
var buffered = MonitorBuffer(hot, log.OnNext, ticks, bufferScheduler);
log.Subscribe(monitor);
buffered.Subscribe(observer);
//instead of inserting something downstream, use scheduler advances
for (int i = 3; i < 80; i++)
{
inputscheduler.AdvanceTo(i);
if (i < 25|| i > 45) bufferscheduler.AdvanceTo(i);
}
observer.Messages.AssertEqual(...);
monitor.Messages.AssertEqual(...);
我正在尝试弄清楚如何测试以下函数,该函数在 Observable.ObserveOn
.
public IObservable<T> MonitorBuffer<T>(IObservable<T> source, Action<int> monitor, TimeSpan interval, IScheduler scheduler)
{
return Observable.Create<T>(ob =>
{
int count = 0;
return new CompositeDisposable(source
.Do(_ => Interlocked.Increment(ref count))
.ObserveOn(scheduler)
.Do(_ => Interlocked.Decrement(ref count))
.Subscribe(ob),
Observable.Interval(interval, scheduler).Select(_ => count).DistinctUntilChanged().Subscribe(monitor)
);
});
}
我设想的是这样的:
var ts = new TestScheduler();
var input = Enumerable.Range(1, 8).Select(i => OnNext(i * 10, i)).ToArray();
var hot = ts.CreateHotObservable(input);
var observer = ts.CreateObserver<int>();
var log = new Subject<int>();
var monitor = ts.CreateObserver<int>();
var ticks = TimeSpan.FromTicks(5);
var buffered = MonitorBuffer(hot, log.OnNext, ticks, ts);
log.Subscribe(monitor);
buffered.Do(x => { /*if(x == 3) Introduce delay here */}).Subscribe(observer);
ts.AdvanceTo(100);
observer.Messages.AssertEqual(...);
monitor.Messages.AssertEqual(...);
问题是,我可以在 Do 中输入什么来获得临时下游延迟的预期效果。
我正在寻找类似这样的结果:
//time: 0--------10--------20--------30--------40--------50--------60--------70--------
//source: ---------1---------2---------3---------4---------5---------6---------7---------
//output: ---------1---------2-----------------------------345-------6---------7---------
//log: ----0-------------------------1---------2---------2----0-----------------------
(注意:前段时间问了一个similar question,但不是很清楚,现在完全重写有点晚了)。
我想我已经搞定了...
秘密是有两个可以独立推进的调度器。
基于问题中的测试代码:
var inputscheduler = new TestScheduler();
(...)
//different scheduler for buffer/observeOn
var bufferScheduler = new TestScheduler();
var buffered = MonitorBuffer(hot, log.OnNext, ticks, bufferScheduler);
log.Subscribe(monitor);
buffered.Subscribe(observer);
//instead of inserting something downstream, use scheduler advances
for (int i = 3; i < 80; i++)
{
inputscheduler.AdvanceTo(i);
if (i < 25|| i > 45) bufferscheduler.AdvanceTo(i);
}
observer.Messages.AssertEqual(...);
monitor.Messages.AssertEqual(...);