为什么以下琐碎的 Rx.NET 不打印任何数字?

Why the following trivial Rx.NET does not print any numbers?

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        public static void Main()
        {
            Run().GetAwaiter().GetResult();
            Console.WriteLine("Press any key ...");
            Console.ReadKey();
        }

        private static async Task Run()
        {
            await Task.Delay(1);

            var dst = new Subject<int>();
            var res = dst
                .SelectMany(async arg =>
                {
                    await Task.Delay(1);
                    Console.WriteLine(arg);
                    return Unit.Default;
                })
                .DefaultIfEmpty();
            Observable.Range(0, 10).Subscribe(dst);

            await res;
        }
    }
}

我期望输出有 10 个数字,实际上我得到了 none。为什么?

您没有获得任何值,因为 Observable.Range(0, 10).Subscribe(dst); 在当前线程上运行直至完成,因此在调用 await resdst 没有任何新值.

您可以执行 (1) var dst = new ReplaySubject<int>(); 或 (2) Observable.Range(0, 10).Delay(TimeSpan.FromSeconds(1.0)).Subscribe(dst); 从您的代码中获取值。

无论如何,这不是微不足道的 Rx。


我认为 Lee 的评论写得特别好,所以我认为值得在这里添加:

i.e. Observable.Range(0, 10).Subscribe(dst); pumps 10 values into the subject immediately. Then you invoke the subscription on res. The values are gone. Subject<T> doesn't cache values. It would be like raise an event 10 times then attaching an event handler.