反应性扩展第一个节流值

Reactive Extensions First Throttled Value

我想 return 来自节流流的第一个值。

在下面的代码中,我预计值 2 将在大约 4 秒后被 returned。

相反,我看到的是值 2 在大约 11 秒后被 returned。在 Events 可枚举完成之前,整个可观察对象不会完成。为什么 FirstAsync 不早点退出这里?我怎样才能让它那样工作?

如果我移除油门线,只有事件 1 会写入控制台,因此我认为它与导致不同行为的事件有关。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApp11
{
    class Program
    {
        static void Main()
        {
            static IEnumerable<int> Events()
            {
                Thread.Sleep(TimeSpan.FromSeconds(1));
                Console.WriteLine("Event 1");
                yield return 1;

                Thread.Sleep(TimeSpan.FromSeconds(1));
                Console.WriteLine("Event 2");
                yield return 2;

                Thread.Sleep(TimeSpan.FromSeconds(9));
                Console.WriteLine("Event 3");
                yield return 3;
            }

            var stopwatch = new Stopwatch();

            stopwatch.Start();
            var result = Events()
                .ToObservable()
                .Throttle(TimeSpan.FromSeconds(2))
                .Select(i =>
                {
                    Console.WriteLine($"Window complete ({i})");
                    return i;
                })
                .FirstAsync()
                .Wait();
            stopwatch.Stop();

            Console.WriteLine($"Observable complete ({result}): {stopwatch.Elapsed}");
            Console.ReadLine();
        }
    }
}

输出如下:

Event 1
Event 2
Window complete (2)
Event 3
Observable complete (2): 00:00:11.1100222

该生成器正在阻止 ToObservable

中的代码

试试这个:

using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace ConsoleApp11
{
    class Program
    {
        static async Task Main()
        {
            var events = Observable.Create<int>(async sub =>
            {
                await Task.Delay(TimeSpan.FromSeconds(1));
                Console.WriteLine("Event 1");
                sub.OnNext(1);

                await Task.Delay(TimeSpan.FromSeconds(1));
                Console.WriteLine("Event 2");
                sub.OnNext(2);

                await Task.Delay(TimeSpan.FromSeconds(9));
                Console.WriteLine("Event 3");
                sub.OnNext(3);

                return Disposable.Empty;
            });

          
            var stopwatch = new Stopwatch();

            stopwatch.Start();
            var result = await events
                .Throttle(TimeSpan.FromSeconds(2))
                .Select(i =>
                {
                    Console.WriteLine($"Window complete ({i})");
                    return i;
                })
                .FirstAsync();
            stopwatch.Stop();

            Console.WriteLine($"Observable complete ({result}): {stopwatch.Elapsed}");
            Console.ReadLine();
        }
    }
}