如何使 NewThreadScheduler.Default 计划在不同的线程上工作
How to make NewThreadScheduler.Default schedules work on differnet threads
试图让我全神贯注于 Rx,以下代码似乎无法像我希望的那样在不同的线程上运行 运行。我错过了什么?
using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
namespace OverviewConsoleApp
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Thread {0}", Thread.CurrentThread.ManagedThreadId);
var query = Enumerable.Range(1, 5).Select(n => n);
IObservable<int> observableQuery = query.ToObservable(NewThreadScheduler.Default);
observableQuery.Subscribe(ProcessNumber, ImDone);
}
static void ProcessNumber(int number)
{
Console.WriteLine("{0} Thread {1}", number, Thread.CurrentThread.ManagedThreadId);
}
static void ImDone()
{
Console.WriteLine("I am done!");
}
}
}
NewThreadScheduler.Default 将所有工作安排在同一线程上。我正在尝试将它们安排在不同的线程上。
我看过这个SO answer,但是那里建议的答案似乎已经过时了,因为IEnumerable 不再有Do 方法。
有人可以帮助我如何 运行 它们在不同的线程上吗?
必须安装 System.Reactive nuget 软件包才能将上述内容安装到 运行。
对于一个简单的问题,答案并不完全是这样。
ToObservable
本质上检查称为 ScheduleLongRunning
的调度程序优化是否可用 - 顾名思义,就是 运行 一个可能 运行 很长时间的任务从而阻止调度程序。否则,它必须递归地安排枚举,这是一个效率更低的顺序。
NewThreadScheduler
来了,它是支持 ScheduleLongRunning
的理想人选 - 它只需要 运行 整个 thunk 在一个新线程上。
最终结果是最终所有工作都安排在 一个 线程中。
最后,NewThreadScheduler
对于枚举来说有点矫枉过正,因此您可能想要切换到 TaskPoolScheduler
。但是等等,同样的优化 运行 是 one Task
的全部内容。
所以ToObservable(TaskPoolScheduler.Default.DisableOptimizations())
.
P.S.
与问题无关,但是 Do
方法以及一大堆有用的运算符都在 System.Interactive
包中。如果你不想安装整个包,Do
就是
public static IEnumerable<T> Do<T>(this IEnumerable<T> source, Action<T> sideEffect)
{
foreach (var value in source)
{
sideEffect(value);
yield return value;
}
}
someEnumerable.ToObservable(scheduler)
将可观察对象的所有项目调度到该调度程序上的 运行。如果你想安排每个单独的项目,你必须把每个项目变成它自己的可观察对象。以下是这样做的:
IObservable<int> observableQuery = query
.ToObservable()
.SelectMany(i => Observable.Return(i).ObserveOn(NewThreadScheduler.Default /*Or TaskPoolScheduler.Default as Asti mentioned */));
试图让我全神贯注于 Rx,以下代码似乎无法像我希望的那样在不同的线程上运行 运行。我错过了什么?
using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
namespace OverviewConsoleApp
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Thread {0}", Thread.CurrentThread.ManagedThreadId);
var query = Enumerable.Range(1, 5).Select(n => n);
IObservable<int> observableQuery = query.ToObservable(NewThreadScheduler.Default);
observableQuery.Subscribe(ProcessNumber, ImDone);
}
static void ProcessNumber(int number)
{
Console.WriteLine("{0} Thread {1}", number, Thread.CurrentThread.ManagedThreadId);
}
static void ImDone()
{
Console.WriteLine("I am done!");
}
}
}
NewThreadScheduler.Default 将所有工作安排在同一线程上。我正在尝试将它们安排在不同的线程上。
我看过这个SO answer,但是那里建议的答案似乎已经过时了,因为IEnumerable 不再有Do 方法。 有人可以帮助我如何 运行 它们在不同的线程上吗?
必须安装 System.Reactive nuget 软件包才能将上述内容安装到 运行。
对于一个简单的问题,答案并不完全是这样。
ToObservable
本质上检查称为 ScheduleLongRunning
的调度程序优化是否可用 - 顾名思义,就是 运行 一个可能 运行 很长时间的任务从而阻止调度程序。否则,它必须递归地安排枚举,这是一个效率更低的顺序。
NewThreadScheduler
来了,它是支持 ScheduleLongRunning
的理想人选 - 它只需要 运行 整个 thunk 在一个新线程上。
最终结果是最终所有工作都安排在 一个 线程中。
最后,NewThreadScheduler
对于枚举来说有点矫枉过正,因此您可能想要切换到 TaskPoolScheduler
。但是等等,同样的优化 运行 是 one Task
的全部内容。
所以ToObservable(TaskPoolScheduler.Default.DisableOptimizations())
.
P.S.
与问题无关,但是 Do
方法以及一大堆有用的运算符都在 System.Interactive
包中。如果你不想安装整个包,Do
就是
public static IEnumerable<T> Do<T>(this IEnumerable<T> source, Action<T> sideEffect)
{
foreach (var value in source)
{
sideEffect(value);
yield return value;
}
}
someEnumerable.ToObservable(scheduler)
将可观察对象的所有项目调度到该调度程序上的 运行。如果你想安排每个单独的项目,你必须把每个项目变成它自己的可观察对象。以下是这样做的:
IObservable<int> observableQuery = query
.ToObservable()
.SelectMany(i => Observable.Return(i).ObserveOn(NewThreadScheduler.Default /*Or TaskPoolScheduler.Default as Asti mentioned */));