Rx.net - 同步与异步观察者 - 取决于来源?

Rx.net - synchronous vs async observers - depends on source?

我是 Rx 的新手,正在努力了解它。没有读过很多书,但首先尝试通过动手实验。

class Program
{
    static void Main(string[] args)
    {
        // one source, produces values with delays
        IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100));
        IObserver<int> handler = null;

        IDisposable subscription = source.Subscribe(
            i =>
            {
                Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
                Thread.Sleep(500);
            },
            exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
            () => Console.WriteLine("Sub 1 Completed observation"));

        IDisposable subscription2 = source.Subscribe(
            i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
            exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
            () => Console.WriteLine("Sub 2 Completed observation"));

        Console.WriteLine("press to cancel");
        Console.ReadLine();
        subscription.Dispose();
        subscription2.Dispose();

    }
}

这会按预期产生异步交错执行。

另一方面,如果我将源更改为同步,即使观察者也会变得阻塞和同步(相同的线程 ID,在没有完全消耗 sub1 的情况下不会转到 sub2)。 有人可以帮我理解这个吗?这是同步版本

class Program
{
    static void Main(string[] args)
    {
        // one source, produces values
        IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i);
        IObserver<int> handler = null;

        // two observers that consume - first with a delay and the second immediately.
        // in this case, the behavior of the observers becomes synchronous? 
        IDisposable subscription = source.Subscribe(
            i =>
            {
                Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
                Thread.Sleep(500);
            },
            exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
            () => Console.WriteLine("Sub 1 Completed observation"));

        IDisposable subscription2 = source.Subscribe(
            i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
            exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
            () => Console.WriteLine("Sub 2 Completed observation"));

        Console.WriteLine("press to cancel");
        Console.ReadLine();
        subscription.Dispose();
        subscription2.Dispose();

    }
}

我认为原因是为操作员选择了默认值 IScheduler。看看接受的答案 here.

对于Generate,这取决于过载。根据答案,这些是使用的默认调度程序。如果你愿意,你可以验证它们的来源

  • 时间运算符的默认 ISchedulerDefaultScheduler.Instance
  • 后一个运算符的默认 ISchedulerCurrentThreadScheduler.Instance

您可以通过在您的同步版本中提供 "non-blocking" 调度程序来确认这一点

IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i * i, DefaultScheduler.Instance);