为什么在同一个线程上调用 Rx.Net OnNext 处理程序,忽略提供的调度程序
Why is Rx.Net OnNext handler invoked on the same thread, ignoring the scheduler provided
我对 RX.Net 调度程序的奇怪行为感到困惑(或者对这个概念的理解完全错误)。
有一个线程产生事件,我想在线程池中同时处理这些事件。但是所有的处理程序都是在同一个线程上调用的。似乎,用 ObserveOn()
设置的 sheduler 被忽略了。
这里是要重现的小场景:
private static void Main(string[] args)
{
Observable
.Create<int>(observer => Task.Run(() => Generate(observer)))
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(i =>
{
Thread.Sleep(TimeSpan.FromSeconds(i));
Console.WriteLine(
"Got {0} on thread \t{1}",
i,
Thread.CurrentThread.ManagedThreadId);
});
Console.ReadLine();
}
private static void Generate(IObserver<int> observer)
{
var i = 0;
while (true)
{
Console.WriteLine(
"Generated {0} on thread \t{1}",
i,
Thread.CurrentThread.ManagedThreadId);
observer.OnNext(i++);
Thread.Sleep(TimeSpan.FromSeconds(1));
}
}
这是输出:
Generated 0 on thread 10
Got 0 on thread 12
Generated 1 on thread 10
Generated 2 on thread 10
Got 1 on thread 12
Generated 3 on thread 10
Got 2 on thread 12
Generated 4 on thread 10
Generated 5 on thread 10
Generated 6 on thread 10
Got 3 on thread 12
Generated 7 on thread 10
Generated 8 on thread 10
Generated 9 on thread 10
Generated 10 on thread 10
Got 4 on thread 12
您对 Rx 的工作原理有错误的概念。
有一个行为契约说可观察对象产生的值是序列化的。您应该始终期望从可观察对象中一个接一个地返回值,并且每个 .OnNext(...)
处理程序将在处理下一个值之前完成。
但是,您可以使用 .SelectMany(...)
使可观察对象并行处理数据。
虽然首先,您已经创建了一个 Generate
方法。你不需要。使用内置运算符始终是个好主意。它们已经过全面测试,因此您应该改用它们。碰巧有一个 Observable.Generate
方法你应该使用。要复制您的功能,您可以这样做:
Observable.Generate(0, i => true, i => i + 1, i => i, TaskPoolScheduler.Default)
所以现在它是因为很容易复制您拥有的代码:
var query =
from n in Observable.Generate(
0,
i => true,
i => i + 1,
i => i,
TaskPoolScheduler.Default)
from id in Observable.Start(() =>
{
Thread.Sleep(1000); /* simulated processing - should never do this otherwise */
return Thread.CurrentThread.ManagedThreadId;
})
select new { n, id };
如果我这样处理:
using (var subscription =
query
.Take(20)
.Subscribe(x =>
Console.WriteLine("Processed {0} on thread {1} and handled on {2}",
x.n.ToString().PadLeft(2, ' '),
x.id.ToString().PadLeft(2, ' '),
Thread.CurrentThread.ManagedThreadId.ToString().PadLeft(2, ' '))))
{
Console.ReadLine();
}
我得到这些结果:
Processed 4 on thread 23 and handled on 23
Processed 2 on thread 20 and handled on 20
Processed 0 on thread 22 and handled on 22
Processed 6 on thread 17 and handled on 17
Processed 7 on thread 21 and handled on 21
Processed 3 on thread 19 and handled on 19
Processed 5 on thread 15 and handled on 15
Processed 1 on thread 24 and handled on 24
Processed 8 on thread 14 and handled on 14
Processed 12 on thread 17 and handled on 17
Processed 9 on thread 23 and handled on 23
Processed 14 on thread 19 and handled on 19
Processed 16 on thread 24 and handled on 24
Processed 10 on thread 20 and handled on 20
Processed 11 on thread 22 and handled on 22
Processed 15 on thread 15 and handled on 15
Processed 13 on thread 21 and handled on 21
Processed 17 on thread 13 and handled on 13
Processed 18 on thread 14 and handled on 14
Processed 19 on thread 17 and handled on 17
请注意 Observable.Generate
生成的值是如何乱序的?这就是实际的并行处理。
我对 RX.Net 调度程序的奇怪行为感到困惑(或者对这个概念的理解完全错误)。
有一个线程产生事件,我想在线程池中同时处理这些事件。但是所有的处理程序都是在同一个线程上调用的。似乎,用 ObserveOn()
设置的 sheduler 被忽略了。
这里是要重现的小场景:
private static void Main(string[] args)
{
Observable
.Create<int>(observer => Task.Run(() => Generate(observer)))
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(i =>
{
Thread.Sleep(TimeSpan.FromSeconds(i));
Console.WriteLine(
"Got {0} on thread \t{1}",
i,
Thread.CurrentThread.ManagedThreadId);
});
Console.ReadLine();
}
private static void Generate(IObserver<int> observer)
{
var i = 0;
while (true)
{
Console.WriteLine(
"Generated {0} on thread \t{1}",
i,
Thread.CurrentThread.ManagedThreadId);
observer.OnNext(i++);
Thread.Sleep(TimeSpan.FromSeconds(1));
}
}
这是输出:
Generated 0 on thread 10
Got 0 on thread 12
Generated 1 on thread 10
Generated 2 on thread 10
Got 1 on thread 12
Generated 3 on thread 10
Got 2 on thread 12
Generated 4 on thread 10
Generated 5 on thread 10
Generated 6 on thread 10
Got 3 on thread 12
Generated 7 on thread 10
Generated 8 on thread 10
Generated 9 on thread 10
Generated 10 on thread 10
Got 4 on thread 12
您对 Rx 的工作原理有错误的概念。
有一个行为契约说可观察对象产生的值是序列化的。您应该始终期望从可观察对象中一个接一个地返回值,并且每个 .OnNext(...)
处理程序将在处理下一个值之前完成。
但是,您可以使用 .SelectMany(...)
使可观察对象并行处理数据。
虽然首先,您已经创建了一个 Generate
方法。你不需要。使用内置运算符始终是个好主意。它们已经过全面测试,因此您应该改用它们。碰巧有一个 Observable.Generate
方法你应该使用。要复制您的功能,您可以这样做:
Observable.Generate(0, i => true, i => i + 1, i => i, TaskPoolScheduler.Default)
所以现在它是因为很容易复制您拥有的代码:
var query =
from n in Observable.Generate(
0,
i => true,
i => i + 1,
i => i,
TaskPoolScheduler.Default)
from id in Observable.Start(() =>
{
Thread.Sleep(1000); /* simulated processing - should never do this otherwise */
return Thread.CurrentThread.ManagedThreadId;
})
select new { n, id };
如果我这样处理:
using (var subscription =
query
.Take(20)
.Subscribe(x =>
Console.WriteLine("Processed {0} on thread {1} and handled on {2}",
x.n.ToString().PadLeft(2, ' '),
x.id.ToString().PadLeft(2, ' '),
Thread.CurrentThread.ManagedThreadId.ToString().PadLeft(2, ' '))))
{
Console.ReadLine();
}
我得到这些结果:
Processed 4 on thread 23 and handled on 23
Processed 2 on thread 20 and handled on 20
Processed 0 on thread 22 and handled on 22
Processed 6 on thread 17 and handled on 17
Processed 7 on thread 21 and handled on 21
Processed 3 on thread 19 and handled on 19
Processed 5 on thread 15 and handled on 15
Processed 1 on thread 24 and handled on 24
Processed 8 on thread 14 and handled on 14
Processed 12 on thread 17 and handled on 17
Processed 9 on thread 23 and handled on 23
Processed 14 on thread 19 and handled on 19
Processed 16 on thread 24 and handled on 24
Processed 10 on thread 20 and handled on 20
Processed 11 on thread 22 and handled on 22
Processed 15 on thread 15 and handled on 15
Processed 13 on thread 21 and handled on 21
Processed 17 on thread 13 and handled on 13
Processed 18 on thread 14 and handled on 14
Processed 19 on thread 17 and handled on 17
请注意 Observable.Generate
生成的值是如何乱序的?这就是实际的并行处理。