如何在不使用传统线程原语的情况下阻塞当前线程,直到 OnComplete 完成执行?
How do I block the current thread until OnComplete has finished executing without the use of traditional threading primitives?
如何在不使用线程原语的情况下阻塞当前线程,直到我的观察者的 OnComplete
处理程序完成?
这是我的代码。我希望 Console.WriteLine("Press...
语句仅在 OnComplete
处理程序后执行,即 ResetCount
已完成执行。
class Program
{
private static long totalItemCount = 0;
private static long listCount = 0;
static void Main()
{
Console.WriteLine($"Starting Main on Thread {Thread.CurrentThread.ManagedThreadId}\n");
var o = Observable.Timer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(1))
.Take(20)
.Concat(Observable.Interval(TimeSpan.FromSeconds(0.01)).Take(200))
.Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
o.Subscribe(Print, onCompleted: ResetCount);
// How I make sure this line appears only after the OnComplete has fired?
// Do I have to use traditional threading primitives such as wait handles?
// Or just cause the main thread to sleep long enough? That doesn't seem right.
Console.WriteLine("\nPress any key to exit...");
Console.ReadKey();
}
private static void ResetCount()
{
if (listCount > 0)
{
Console.WriteLine($"{totalItemCount} items processed in {listCount} lists.");
}
else
{
Console.WriteLine($"{totalItemCount} items processed.");
}
Interlocked.Exchange(ref totalItemCount, 0);
Interlocked.Exchange(ref listCount, 0);
}
static void Print<T>(T value)
{
var threadType = Thread.CurrentThread.IsBackground ? "Background" : "Foreground";
if (value is IList)
{
var list = value as IList;
Console.WriteLine($"{list.Count} items in list #{Interlocked.Increment(ref listCount)}:");
foreach (var item in list)
{
Console.WriteLine($"{item.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}");
}
Console.WriteLine();
}
else
{
Console.WriteLine($"{value.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}");
}
}
}
在 Rx 上,我们有特定的调度程序来处理线程、同步和相关问题。
您可以在此处阅读更多相关信息:
http://www.introtorx.com/content/v1.0.10621.0/15_SchedulingAndThreading.html
但基本上你要找的是改变这一行:
.Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), Scheduler.CurrentThread);
它们是 test/validate Rx 查询的几种方法。请记住,这并不是所有问题的答案。
如何在不使用线程原语的情况下阻塞当前线程,直到我的观察者的 OnComplete
处理程序完成?
这是我的代码。我希望 Console.WriteLine("Press...
语句仅在 OnComplete
处理程序后执行,即 ResetCount
已完成执行。
class Program
{
private static long totalItemCount = 0;
private static long listCount = 0;
static void Main()
{
Console.WriteLine($"Starting Main on Thread {Thread.CurrentThread.ManagedThreadId}\n");
var o = Observable.Timer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(1))
.Take(20)
.Concat(Observable.Interval(TimeSpan.FromSeconds(0.01)).Take(200))
.Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
o.Subscribe(Print, onCompleted: ResetCount);
// How I make sure this line appears only after the OnComplete has fired?
// Do I have to use traditional threading primitives such as wait handles?
// Or just cause the main thread to sleep long enough? That doesn't seem right.
Console.WriteLine("\nPress any key to exit...");
Console.ReadKey();
}
private static void ResetCount()
{
if (listCount > 0)
{
Console.WriteLine($"{totalItemCount} items processed in {listCount} lists.");
}
else
{
Console.WriteLine($"{totalItemCount} items processed.");
}
Interlocked.Exchange(ref totalItemCount, 0);
Interlocked.Exchange(ref listCount, 0);
}
static void Print<T>(T value)
{
var threadType = Thread.CurrentThread.IsBackground ? "Background" : "Foreground";
if (value is IList)
{
var list = value as IList;
Console.WriteLine($"{list.Count} items in list #{Interlocked.Increment(ref listCount)}:");
foreach (var item in list)
{
Console.WriteLine($"{item.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}");
}
Console.WriteLine();
}
else
{
Console.WriteLine($"{value.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}");
}
}
}
在 Rx 上,我们有特定的调度程序来处理线程、同步和相关问题。
您可以在此处阅读更多相关信息: http://www.introtorx.com/content/v1.0.10621.0/15_SchedulingAndThreading.html
但基本上你要找的是改变这一行:
.Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), Scheduler.CurrentThread);
它们是 test/validate Rx 查询的几种方法。请记住,这并不是所有问题的答案。