RX:测试 Sample\Interval\Switch 管道
RX: testing Sample\Interval\Switch pipeline
我有一些 RX 代码来测试事件流上的 inactivity,流上的 activity 重置了一个间隔,即 inactivity 触发器。
public interface IReportActivity
{
event EventHandler Activity;
}
public interface IInactivityMonitor
{
IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout);
}
public class InactivityMonitor : IInactivityMonitor
{
private readonly ISchedulerProvider _schedulerProvider;
private readonly IReportActivity _activitySource;
private IObservable<Unit> _inactivityObservable;
public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
{
_activitySource = activitySource;
_schedulerProvider = schedulerProvider;
}
public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
{
return GetInactivityObservable()
.Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
.Timestamp()
.Select(__ => Unit.Default))
.Switch();
}
public IObservable<Unit> GetInactivityObservable()
{
return _inactivityObservable = _inactivityObservable ??
Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
.Select(_ => Unit.Default)
.Publish()
.RefCount();
}
}
代码按要求工作(虽然我认为每秒重新创建间隔已经结束 - ObserveInactivity 应该在超时之前采样并根据最后一个 activity 的时间戳重置)
我遇到的真正问题是尝试测试此代码。
[TestFixture]
public class InactivityMonitorTests
{
private TestSchedulers _testSchedulers;
private InactivityMonitor _sut;
private AutoMock _moqqer;
protected override void Setup()
{
base.Setup();
_moqqer = new AutoMock()
_testSchedulers = new TestSchedulers();
_moqqer.Use<ISchedulerProvider>(_testSchedulers);
_sut = Moqqer.CreateInstance<InactivityMonitor>();
}
// this test passes
[Test]
public void GetInactivityObservable_ActivityDetected_ReportsActivity()
{
var activityObserved = false;
_sut.GetInactivityObservable()
.Subscribe(x => activityObserved = true);
RaiseActivityEvent();
_testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);
activityObserved.Should().BeTrue();
}
private void RaiseActivityEvent()
{
_moqqer.GetMock<IReportActivty>()
.Raise(m => m.Activity += null, EventArgs.Empty);
}
// this test fails, The interval never appears to get set up via the tests.
[Test]
public void ObserveActivity_InactivtyTimeoutExceeded_NotificationReceived()
{
var inactivityObserved = false;
_sut.ObserveInactivity(TimeSpan.FromSeconds(10))
.Subscribe(x => inactivityObserved = true);
_testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);
inactivityObserved.Should().BeTrue();
}
}
public interface ISchedulerProvider
{
IScheduler CurrentThread { get; }
IScheduler Dispatcher { get; }
IScheduler Immediate { get; }
IScheduler NewThread { get; }
IScheduler ThreadPool { get; }
IScheduler TaskPool { get; }
}
public sealed class TestSchedulers : ISchedulerProvider
{
private readonly TestScheduler _currentThread = new TestScheduler();
private readonly TestScheduler _dispatcher = new TestScheduler();
private readonly TestScheduler _immediate = new TestScheduler();
private readonly TestScheduler _newThread = new TestScheduler();
private readonly TestScheduler _threadPool = new TestScheduler();
private readonly TestScheduler _taskPool = new TestScheduler();
#region Explicit implementation of ISchedulerService
IScheduler ISchedulerProvider.CurrentThread { get { return _currentThread; } }
IScheduler ISchedulerProvider.Dispatcher { get { return _dispatcher; } }
IScheduler ISchedulerProvider.Immediate { get { return _immediate; } }
IScheduler ISchedulerProvider.NewThread { get { return _newThread; } }
IScheduler ISchedulerProvider.ThreadPool { get { return _threadPool; } }
IScheduler ISchedulerProvider.TaskPool { get { return _taskPool; } }
#endregion
public TestScheduler CurrentThread { get { return _currentThread; } }
public TestScheduler Dispatcher { get { return _dispatcher; } }
public TestScheduler Immediate { get { return _immediate; } }
public TestScheduler NewThread { get { return _newThread; } }
public TestScheduler ThreadPool { get { return _threadPool; } }
public TestScheduler TaskPool { get { return _taskPool; } }
}
我尝试了多种启动调度程序的方法,在订阅之前和之后,尝试订阅另一个调度程序并在订阅之前启动它,但没有成功。
编辑:调试显示永远不会在 ObserveInactivity 方法中创建间隔。
希望有人能指出我正确的方向。
谢谢
所以,有机会回到这个问题上,我发现了我的方法中的错误。
Observable.Interval 从未启动,因为没有报告 activity(由于 activity 中的错误命名可观察)。 运行 那里的应用很可能至少引发了 1 个事件。为了解决这个问题,我添加了 .StartWith(Unit.Default)
如下,这允许立即为每个观察者启动超时,这显然是原始实现中的潜在错误。
public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
{
return GetInactivityObservable()
.StartWith(Unit.Default) // <--
.Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
.Timestamp()
.Select(__ => Unit.Default))
.Switch();
}
我会重新组织您的 inactivity 查询,这样您就不会从 _activitySource.Activity
触发然后增加时间,而是期望 failure/silence 在 XXX 期间和为此发出通知。如果有 activity.
,则将该查询升级为 cancel/restart
即而不是
public IObservable<Unit> GetInactivityObservable()
{
return _inactivityObservable = _inactivityObservable ??
Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
.Select(_ => Unit.Default)
.Publish()
.RefCount();
}
将其更改为
public IObservable<Unit> GetInactivityObservable()
{
return _inactivityObservable = _inactivityObservable ??
Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.Select(_=>Unit.Default)
.StartWith(Unit.Default)
.Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
.Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread).Select(_=>Unit.Default))
.Switch()
.Publish()
.RefCount();
}
然后,由于已经对 Rx 进行了惰性求值,我会将此方法设为静态且无副作用,并在您的构造函数中调用它,或者只是将其内联。
public class InactivityMonitor : IInactivityMonitor
{
private readonly ISchedulerProvider _schedulerProvider;
private readonly IObservable<Unit> _inactivityObservable;
private static readonly TimeSpan SilencePeriod = TimeSpan.FromSeconds(1);
public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
{
_schedulerProvider = schedulerProvider;
_inactivityObservable = Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.StartWith(null)
.Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread))
.Switch()
.Select(_=>Unit.Default)
.Publish()
.RefCount();
}
//...
}
我有一些 RX 代码来测试事件流上的 inactivity,流上的 activity 重置了一个间隔,即 inactivity 触发器。
public interface IReportActivity
{
event EventHandler Activity;
}
public interface IInactivityMonitor
{
IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout);
}
public class InactivityMonitor : IInactivityMonitor
{
private readonly ISchedulerProvider _schedulerProvider;
private readonly IReportActivity _activitySource;
private IObservable<Unit> _inactivityObservable;
public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
{
_activitySource = activitySource;
_schedulerProvider = schedulerProvider;
}
public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
{
return GetInactivityObservable()
.Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
.Timestamp()
.Select(__ => Unit.Default))
.Switch();
}
public IObservable<Unit> GetInactivityObservable()
{
return _inactivityObservable = _inactivityObservable ??
Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
.Select(_ => Unit.Default)
.Publish()
.RefCount();
}
}
代码按要求工作(虽然我认为每秒重新创建间隔已经结束 - ObserveInactivity 应该在超时之前采样并根据最后一个 activity 的时间戳重置)
我遇到的真正问题是尝试测试此代码。
[TestFixture]
public class InactivityMonitorTests
{
private TestSchedulers _testSchedulers;
private InactivityMonitor _sut;
private AutoMock _moqqer;
protected override void Setup()
{
base.Setup();
_moqqer = new AutoMock()
_testSchedulers = new TestSchedulers();
_moqqer.Use<ISchedulerProvider>(_testSchedulers);
_sut = Moqqer.CreateInstance<InactivityMonitor>();
}
// this test passes
[Test]
public void GetInactivityObservable_ActivityDetected_ReportsActivity()
{
var activityObserved = false;
_sut.GetInactivityObservable()
.Subscribe(x => activityObserved = true);
RaiseActivityEvent();
_testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);
activityObserved.Should().BeTrue();
}
private void RaiseActivityEvent()
{
_moqqer.GetMock<IReportActivty>()
.Raise(m => m.Activity += null, EventArgs.Empty);
}
// this test fails, The interval never appears to get set up via the tests.
[Test]
public void ObserveActivity_InactivtyTimeoutExceeded_NotificationReceived()
{
var inactivityObserved = false;
_sut.ObserveInactivity(TimeSpan.FromSeconds(10))
.Subscribe(x => inactivityObserved = true);
_testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);
inactivityObserved.Should().BeTrue();
}
}
public interface ISchedulerProvider
{
IScheduler CurrentThread { get; }
IScheduler Dispatcher { get; }
IScheduler Immediate { get; }
IScheduler NewThread { get; }
IScheduler ThreadPool { get; }
IScheduler TaskPool { get; }
}
public sealed class TestSchedulers : ISchedulerProvider
{
private readonly TestScheduler _currentThread = new TestScheduler();
private readonly TestScheduler _dispatcher = new TestScheduler();
private readonly TestScheduler _immediate = new TestScheduler();
private readonly TestScheduler _newThread = new TestScheduler();
private readonly TestScheduler _threadPool = new TestScheduler();
private readonly TestScheduler _taskPool = new TestScheduler();
#region Explicit implementation of ISchedulerService
IScheduler ISchedulerProvider.CurrentThread { get { return _currentThread; } }
IScheduler ISchedulerProvider.Dispatcher { get { return _dispatcher; } }
IScheduler ISchedulerProvider.Immediate { get { return _immediate; } }
IScheduler ISchedulerProvider.NewThread { get { return _newThread; } }
IScheduler ISchedulerProvider.ThreadPool { get { return _threadPool; } }
IScheduler ISchedulerProvider.TaskPool { get { return _taskPool; } }
#endregion
public TestScheduler CurrentThread { get { return _currentThread; } }
public TestScheduler Dispatcher { get { return _dispatcher; } }
public TestScheduler Immediate { get { return _immediate; } }
public TestScheduler NewThread { get { return _newThread; } }
public TestScheduler ThreadPool { get { return _threadPool; } }
public TestScheduler TaskPool { get { return _taskPool; } }
}
我尝试了多种启动调度程序的方法,在订阅之前和之后,尝试订阅另一个调度程序并在订阅之前启动它,但没有成功。
编辑:调试显示永远不会在 ObserveInactivity 方法中创建间隔。
希望有人能指出我正确的方向。
谢谢
所以,有机会回到这个问题上,我发现了我的方法中的错误。
Observable.Interval 从未启动,因为没有报告 activity(由于 activity 中的错误命名可观察)。 运行 那里的应用很可能至少引发了 1 个事件。为了解决这个问题,我添加了 .StartWith(Unit.Default)
如下,这允许立即为每个观察者启动超时,这显然是原始实现中的潜在错误。
public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
{
return GetInactivityObservable()
.StartWith(Unit.Default) // <--
.Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
.Timestamp()
.Select(__ => Unit.Default))
.Switch();
}
我会重新组织您的 inactivity 查询,这样您就不会从 _activitySource.Activity
触发然后增加时间,而是期望 failure/silence 在 XXX 期间和为此发出通知。如果有 activity.
即而不是
public IObservable<Unit> GetInactivityObservable()
{
return _inactivityObservable = _inactivityObservable ??
Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
.Select(_ => Unit.Default)
.Publish()
.RefCount();
}
将其更改为
public IObservable<Unit> GetInactivityObservable()
{
return _inactivityObservable = _inactivityObservable ??
Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.Select(_=>Unit.Default)
.StartWith(Unit.Default)
.Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
.Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread).Select(_=>Unit.Default))
.Switch()
.Publish()
.RefCount();
}
然后,由于已经对 Rx 进行了惰性求值,我会将此方法设为静态且无副作用,并在您的构造函数中调用它,或者只是将其内联。
public class InactivityMonitor : IInactivityMonitor
{
private readonly ISchedulerProvider _schedulerProvider;
private readonly IObservable<Unit> _inactivityObservable;
private static readonly TimeSpan SilencePeriod = TimeSpan.FromSeconds(1);
public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
{
_schedulerProvider = schedulerProvider;
_inactivityObservable = Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.StartWith(null)
.Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread))
.Switch()
.Select(_=>Unit.Default)
.Publish()
.RefCount();
}
//...
}