如何在不使用传统线程原语的情况下阻塞当前线程,直到 OnComplete 完成执行?

How do I block the current thread until OnComplete has finished executing without the use of traditional threading primitives?

如何在不使用线程原语的情况下阻塞当前线程,直到我的观察者的 OnComplete 处理程序完成?

这是我的代码。我希望 Console.WriteLine("Press... 语句仅在 OnComplete 处理程序后执行,即 ResetCount 已完成执行。

class Program
{
    private static long totalItemCount = 0;
    private static long listCount = 0;

    static void Main()
    {
        Console.WriteLine($"Starting Main on Thread {Thread.CurrentThread.ManagedThreadId}\n");

        var o = Observable.Timer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(1))
            .Take(20)
            .Concat(Observable.Interval(TimeSpan.FromSeconds(0.01)).Take(200))
            .Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));

        o.Subscribe(Print, onCompleted: ResetCount);


        // How I make sure this line appears only after the OnComplete has fired?
        // Do I have to use traditional threading primitives such as wait handles?
        // Or just cause the main thread to sleep long enough? That doesn't seem right.
        Console.WriteLine("\nPress any key to exit...");
        Console.ReadKey();
    }

    private static void ResetCount()
    {
        if (listCount > 0)
        {
            Console.WriteLine($"{totalItemCount} items processed in {listCount} lists.");
        }
        else
        {
            Console.WriteLine($"{totalItemCount} items processed.");
        }

        Interlocked.Exchange(ref totalItemCount, 0);
        Interlocked.Exchange(ref listCount, 0);
    }

    static void Print<T>(T value)
    {
        var threadType = Thread.CurrentThread.IsBackground ? "Background" : "Foreground";

        if (value is IList)
        {
            var list = value as IList;
            Console.WriteLine($"{list.Count} items in list #{Interlocked.Increment(ref listCount)}:");

            foreach (var item in list)
            {
                Console.WriteLine($"{item.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}");
            }
            Console.WriteLine();
        }
        else
        {
            Console.WriteLine($"{value.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}");
        }
    }
}

在 Rx 上,我们有特定的调度程序来处理线程、同步和相关问题。

您可以在此处阅读更多相关信息: http://www.introtorx.com/content/v1.0.10621.0/15_SchedulingAndThreading.html

但基本上你要找的是改变这一行:

 .Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), Scheduler.CurrentThread);

它们是 test/validate Rx 查询的几种方法。请记住,这并不是所有问题的答案。