RX:测试 Sample\Interval\Switch 管道

RX: testing Sample\Interval\Switch pipeline

我有一些 RX 代码来测试事件流上的 inactivity,流上的 activity 重置了一个间隔,即 inactivity 触发器。

public interface IReportActivity
{
    event EventHandler Activity;
}

public interface IInactivityMonitor
{
    IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout);
}

public class InactivityMonitor : IInactivityMonitor
{
    private readonly ISchedulerProvider _schedulerProvider;
    private readonly IReportActivity _activitySource;
    private IObservable<Unit> _inactivityObservable;

    public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
    {
        _activitySource = activitySource;
        _schedulerProvider = schedulerProvider;
    }

    public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
    {
        return GetInactivityObservable()
            .Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
                    .Timestamp()
                    .Select(__ => Unit.Default))
            .Switch();
    }

    public IObservable<Unit> GetInactivityObservable()
    {
        return _inactivityObservable = _inactivityObservable ??
            Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
                h => _activitySource.Activity += h,
                h => _activitySource.Activity -= h)
                .Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
                .Select(_ => Unit.Default)
                .Publish()
                .RefCount();
    }
}

代码按要求工作(虽然我认为每秒重新创建间隔已经结束 - ObserveInactivity 应该在超时之前采样并根据最后一个 activity 的时间戳重置)

我遇到的真正问题是尝试测试此代码。

[TestFixture]
public class InactivityMonitorTests
{
    private TestSchedulers _testSchedulers;
    private InactivityMonitor _sut;
    private AutoMock _moqqer;

    protected override void Setup()
    {
        base.Setup();

        _moqqer = new AutoMock()
        _testSchedulers = new TestSchedulers();
        _moqqer.Use<ISchedulerProvider>(_testSchedulers);
        _sut = Moqqer.CreateInstance<InactivityMonitor>();
    }

    // this test passes
    [Test]
    public void GetInactivityObservable_ActivityDetected_ReportsActivity()
    {
        var activityObserved = false;

        _sut.GetInactivityObservable()
           .Subscribe(x => activityObserved = true);

        RaiseActivityEvent();

        _testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);

        activityObserved.Should().BeTrue();
    }

    private void RaiseActivityEvent()
    {
        _moqqer.GetMock<IReportActivty>()
          .Raise(m => m.Activity += null, EventArgs.Empty);
    }

    // this test fails, The interval never appears to get set up via the tests.
    [Test]
    public void ObserveActivity_InactivtyTimeoutExceeded_NotificationReceived()
    {
       var inactivityObserved = false;

        _sut.ObserveInactivity(TimeSpan.FromSeconds(10))
           .Subscribe(x => inactivityObserved = true);

       _testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);

       inactivityObserved.Should().BeTrue();
    }
}

public interface ISchedulerProvider
{
    IScheduler CurrentThread { get; }
    IScheduler Dispatcher { get; }
    IScheduler Immediate { get; }
    IScheduler NewThread { get; }
    IScheduler ThreadPool { get; }

    IScheduler TaskPool { get; } 
}

public sealed class TestSchedulers : ISchedulerProvider
{
    private readonly TestScheduler _currentThread = new TestScheduler();
    private readonly TestScheduler _dispatcher = new TestScheduler();
    private readonly TestScheduler _immediate = new TestScheduler();
    private readonly TestScheduler _newThread = new TestScheduler();
    private readonly TestScheduler _threadPool = new TestScheduler();
    private readonly TestScheduler _taskPool = new TestScheduler();
    #region Explicit implementation of ISchedulerService
    IScheduler ISchedulerProvider.CurrentThread { get { return _currentThread; } }
    IScheduler ISchedulerProvider.Dispatcher { get { return _dispatcher; } }
    IScheduler ISchedulerProvider.Immediate { get { return _immediate; } }
    IScheduler ISchedulerProvider.NewThread { get { return _newThread; } }
    IScheduler ISchedulerProvider.ThreadPool { get { return _threadPool; } }
    IScheduler ISchedulerProvider.TaskPool { get { return _taskPool; } }

    #endregion
    public TestScheduler CurrentThread { get { return _currentThread; } }
    public TestScheduler Dispatcher { get { return _dispatcher; } }
    public TestScheduler Immediate { get { return _immediate; } }
    public TestScheduler NewThread { get { return _newThread; } }
    public TestScheduler ThreadPool { get { return _threadPool; } }
    public TestScheduler TaskPool { get { return _taskPool; } }
}

我尝试了多种启动调度程序的方法,在订阅之前和之后,尝试订阅另一个调度程序并在订阅之前启动它,但没有成功。

编辑:调试显示永远不会在 ObserveInactivity 方法中创建间隔。

希望有人能指出我正确的方向。

谢谢

所以,有机会回到这个问题上,我发现了我的方法中的错误。

Observable.Interval 从未启动,因为没有报告 activity(由于 activity 中的错误命名可观察)。 运行 那里的应用很可能至少引发了 1 个事件。为了解决这个问题,我添加了 .StartWith(Unit.Default) 如下,这允许立即为每个观察者启动超时,这显然是原始实现中的潜在错误。

public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
{
    return GetInactivityObservable()
        .StartWith(Unit.Default)  // <--
        .Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
                .Timestamp()
                .Select(__ => Unit.Default))
        .Switch();
}

我会重新组织您的 inactivity 查询,这样您就不会从 _activitySource.Activity 触发然后增加时间,而是期望 failure/silence 在 XXX 期间和为此发出通知。如果有 activity.

,则将该查询升级为 cancel/restart

即而不是

public IObservable<Unit> GetInactivityObservable()
{
    return _inactivityObservable = _inactivityObservable ??
        Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
            h => _activitySource.Activity += h,
            h => _activitySource.Activity -= h)
            .Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
            .Select(_ => Unit.Default)
            .Publish()
            .RefCount();
}

将其更改为

public IObservable<Unit> GetInactivityObservable()
{
    return _inactivityObservable = _inactivityObservable ??
        Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
            h => _activitySource.Activity += h,
            h => _activitySource.Activity -= h)
            .Select(_=>Unit.Default)
            .StartWith(Unit.Default)
            .Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
            .Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread).Select(_=>Unit.Default))
            .Switch()
            .Publish()
            .RefCount();
}

然后,由于已经对 Rx 进行了惰性求值,我会将此方法设为静态且无副作用,并在您的构造函数中调用它,或者只是将其内联。

public class InactivityMonitor : IInactivityMonitor
{
    private readonly ISchedulerProvider _schedulerProvider;
    private readonly IObservable<Unit> _inactivityObservable;
    private static readonly TimeSpan SilencePeriod = TimeSpan.FromSeconds(1);

public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
{
    _schedulerProvider = schedulerProvider;
    _inactivityObservable = Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
            h => _activitySource.Activity += h,
            h => _activitySource.Activity -= h)
            .StartWith(null)
            .Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread))
            .Switch()
            .Select(_=>Unit.Default)
            .Publish()
            .RefCount();
}

//...

}