反应性扩展 - AsyncLock.Wait 抛出 ArgumentNullException

Reactive Extensions - AsyncLock.Wait throws ArgumentNullException

当使用 Observable.Interval 时,我遇到了几次导致应用程序崩溃的以下异常(该异常只能在事件查看器中找到 - 错误来源:.NET 运行时)

Application: RxPlayground.exe
Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.ArgumentNullException
Stack:
   at System.Reactive.Concurrency.AsyncLock.Wait(System.Action)
   at System.Reactive.Concurrency.DefaultScheduler+<>c__DisplayClass9`1[[System.Int64, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].<SchedulePeriodic>b__6()
   at System.Reactive.Concurrency.ConcurrencyAbstractionLayerImpl+PeriodicTimer.Tick(System.Object)
   at System.Threading.TimerQueueTimer.CallCallbackInContext(System.Object)
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.TimerQueueTimer.CallCallback()
   at System.Threading.TimerQueueTimer.Fire()
   at System.Threading.TimerQueue.FireNextTimers()
   at System.Threading.TimerQueue.AppDomainTimerCallback()

你在使用Reactive Extensions的时候遇到过这样的问题吗?您知道可能是什么原因吗?

这里是一些可能导致问题的代码(但也许不是,因为我无法重现它...):

static void Main(string[] args)
{
    var subscriptions = SubscribeToObservables().Take(10000).ToArray();

    Console.WriteLine("BEGIN. Click Enter to Dispose...");
    Console.ReadLine();

    Console.WriteLine("DISPOSING");

    foreach (var subscription in subscriptions)
    {
        subscription.Dispose();
    }

    Console.WriteLine("DISPOSED. Click Enter to finish...");
    Console.ReadLine();

    Console.WriteLine("END");
}

private static IEnumerable<IDisposable> SubscribeToObservables()
{
    var x = Observable.Interval(TimeSpan.FromSeconds(1)).Select(n =>
    {
        //Thread.Sleep(TimeSpan.FromMinutes(1));
        return n;
    });

    var sub = x.Subscribe(
        n =>
        {
            Thread.Sleep(TimeSpan.FromMinutes(1));
            Console.WriteLine(n);
        });
    yield return sub;
}

编辑: 我检查了 AsyncLock class 及其所有参考文献 (https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/AsyncLock.cs) and I cannot see any possibility that the Action delegate could ever be null! In case of our exception we can see from the stacktrace that the Wait was invoked from the following code (https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs):

public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action)
{
    if (period < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException("period");
    if (action == null)
        throw new ArgumentNullException("action");

    var state1 = state;
    var gate = new AsyncLock();

    var cancel = s_cal.StartPeriodicTimer(() =>
    {
        gate.Wait(() =>
        {
            state1 = action(state1);
        });
    }, period);

    return Disposable.Create(() =>
    {
        cancel.Dispose();
        gate.Dispose();
        action = Stubs<TState>.I;
    });
}

我已在 IL DASM 中检查 System.Reactive.Concurrency.DefaultScheduler+<>c__DisplayClass9`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=中立的,PublicKeyToken=b77a5c561934e089]].b__6() 为以下 lambda 编译:

() =>
{
    gate.Wait(() =>
    {
        state1 = action(state1);
    });
}

看起来 lambda () => { state1 = action(state1); } 被传递为 null。 可是怎么可能呢?

可以重现异常。您必须传递一个选择器函数,它只是 null。也许您忘记了在每种情况下都初始化 Select 块中的函数。或者 class 的实例不再存在,当调度程序想要 运行 它时。这是可能的方式:

Func<long, int> selector = null;
var x =  Observable.Interval(TimeSpan.FromSeconds(1)).Select(selector);