SubscribeOn() 在这里做什么?
What is SubscribeOn() doing here?
我正在自学响应式编程,方法是解决随机问题并毫不羞愧地问愚蠢的新手问题。在弄清楚线程调度的工作原理时,我设法难住了自己。虽然我很确定这段代码没有逻辑意义,但我也无法理解发生了什么。弄清楚这一点可能会帮助我。这是代码:
var testScheduler = new TestScheduler();
var newThreadScheduler = new NewThreadScheduler();
var emitter = new Subject<string>();
testScheduler.Schedule(TimeSpan.FromSeconds(0.1), () => emitter.OnNext("one"));
testScheduler.Schedule(TimeSpan.FromSeconds(0.2), () => emitter.OnCompleted());
var subscription = emitter.SubscribeOn(newThreadScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);
Console.WriteLine("DONE.");
Console.ReadLine();
我预期可能是:
one
DONE.
Complete!
有可能交错,因为我不太确定 SubscribeOn() 会做什么。我得到的是:
DONE.
Complete!
这里到底发生了什么?为什么项目在完成之前没有生产? ObserveOn() 在这种情况下按我预期的方式工作,我明白为什么:它是 运行 其他线程上的委托,它们可以与 "DONE." 交错那么 SubscribeOn() 到底在做什么?
这里只是一个竞争条件。
如果我们将所有代码都撕回
var emitter = new Subject<string>();
emitter.OnNext("one");
emitter.OnCompleted();
var subscription = emitter
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
Console.WriteLine("DONE.");
Console.ReadLine();
我们得到了相同的结果。
通过使用 Subject<T>
您将不会获得任何缓存行为,但 OnCompleted
通知除外。
SubscribeOn
操作员将安排任何订阅工作在提供的 IScheduler
实例上完成。
在订阅 Subject<T>
的情况下,几乎没有工作要做。
它几乎就像将回调注册到回调列表一样简单。
将工作安排到 NewThreadScheduler
将创建一个新线程,然后创建一个内部事件循环来处理计划的工作。
这非常快,但确实需要创建一个新线程,一个 EventloopScheduler 并执行到新线程的上下文切换。
在您的示例中,您将 OnNext
和 OnCompleted
通知安排在 TestScheduler
上。
然后你 SubscribeOn
和 NewThreadScheduler
。
之后,您开始处理 TestScheduler
实例的所有计划工作。
这些 virtually 计划项目的处理只是迭代计划项目,执行委托并推进虚拟时钟。
这速度快得令人难以置信。
更具体地说,下面的代码类似于您编写的代码
var newThreadScheduler = new NewThreadScheduler();
var callbacks = new List<Action<string>>();
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str)));
foreach (var callback in callbacks)
{
callback("one");
}
Console.WriteLine("Done");
这里我们只有一个回调操作列表(称它们为订阅者或观察者)。
然后,我们在新线程上异步安排添加这些回调之一。
然后立即迭代回调并将字符串 "one" 发送给每个回调。
结果是
Done
只是没有给 NewThreadScheduler
足够的时间来启动新线程、安排操作,然后在主线程迭代集合之前执行该操作。
因此,我认为您没有遵循以下几条准则:
1)避免主题;-)
2) 不要混合线程和单元测试。我假设 TestScheduler
的存在是因为您正在测试它。但是,您可以使用 TestScheduler
的两个实例,例如背景和前景实例。
为了更有帮助,我会提供积极的指导,建议您从测试中删除第二个调度程序。
在 SubscribeOn
运算符中使用 TestScheduler
实例。
接下来我建议使用 TestScheduler
的 Observable 序列工厂方法,即 CreateColdObservable
来替换 subjects+scheduling 的使用。
最后我不知道前进到 1s 的特定时间是否比仅使用 Start
方法有任何收获。
我认为这会减少噪音和使用魔法值 1s。
var testScheduler = new TestScheduler();
var source = testScheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"),
ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks));
var subscription = source.SubscribeOn(testScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.Start();
Console.WriteLine("DONE.");
Console.ReadLine();
现在唯一的问题是 SubscribeOn
调用非常多余。
仅供参考:NewThreadScheduler
的代码 -
https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs
我正在自学响应式编程,方法是解决随机问题并毫不羞愧地问愚蠢的新手问题。在弄清楚线程调度的工作原理时,我设法难住了自己。虽然我很确定这段代码没有逻辑意义,但我也无法理解发生了什么。弄清楚这一点可能会帮助我。这是代码:
var testScheduler = new TestScheduler();
var newThreadScheduler = new NewThreadScheduler();
var emitter = new Subject<string>();
testScheduler.Schedule(TimeSpan.FromSeconds(0.1), () => emitter.OnNext("one"));
testScheduler.Schedule(TimeSpan.FromSeconds(0.2), () => emitter.OnCompleted());
var subscription = emitter.SubscribeOn(newThreadScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);
Console.WriteLine("DONE.");
Console.ReadLine();
我预期可能是:
one
DONE.
Complete!
有可能交错,因为我不太确定 SubscribeOn() 会做什么。我得到的是:
DONE.
Complete!
这里到底发生了什么?为什么项目在完成之前没有生产? ObserveOn() 在这种情况下按我预期的方式工作,我明白为什么:它是 运行 其他线程上的委托,它们可以与 "DONE." 交错那么 SubscribeOn() 到底在做什么?
这里只是一个竞争条件。
如果我们将所有代码都撕回
var emitter = new Subject<string>();
emitter.OnNext("one");
emitter.OnCompleted();
var subscription = emitter
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
Console.WriteLine("DONE.");
Console.ReadLine();
我们得到了相同的结果。
通过使用 Subject<T>
您将不会获得任何缓存行为,但 OnCompleted
通知除外。
SubscribeOn
操作员将安排任何订阅工作在提供的 IScheduler
实例上完成。
在订阅 Subject<T>
的情况下,几乎没有工作要做。
它几乎就像将回调注册到回调列表一样简单。
将工作安排到 NewThreadScheduler
将创建一个新线程,然后创建一个内部事件循环来处理计划的工作。
这非常快,但确实需要创建一个新线程,一个 EventloopScheduler 并执行到新线程的上下文切换。
在您的示例中,您将 OnNext
和 OnCompleted
通知安排在 TestScheduler
上。
然后你 SubscribeOn
和 NewThreadScheduler
。
之后,您开始处理 TestScheduler
实例的所有计划工作。
这些 virtually 计划项目的处理只是迭代计划项目,执行委托并推进虚拟时钟。
这速度快得令人难以置信。
更具体地说,下面的代码类似于您编写的代码
var newThreadScheduler = new NewThreadScheduler();
var callbacks = new List<Action<string>>();
newThreadScheduler.Schedule(()=>callbacks.Add(str=>Console.WriteLine(str)));
foreach (var callback in callbacks)
{
callback("one");
}
Console.WriteLine("Done");
这里我们只有一个回调操作列表(称它们为订阅者或观察者)。 然后,我们在新线程上异步安排添加这些回调之一。 然后立即迭代回调并将字符串 "one" 发送给每个回调。 结果是
Done
只是没有给 NewThreadScheduler
足够的时间来启动新线程、安排操作,然后在主线程迭代集合之前执行该操作。
因此,我认为您没有遵循以下几条准则:
1)避免主题;-)
2) 不要混合线程和单元测试。我假设 TestScheduler
的存在是因为您正在测试它。但是,您可以使用 TestScheduler
的两个实例,例如背景和前景实例。
为了更有帮助,我会提供积极的指导,建议您从测试中删除第二个调度程序。
在 SubscribeOn
运算符中使用 TestScheduler
实例。
接下来我建议使用 TestScheduler
的 Observable 序列工厂方法,即 CreateColdObservable
来替换 subjects+scheduling 的使用。
最后我不知道前进到 1s 的特定时间是否比仅使用 Start
方法有任何收获。
我认为这会减少噪音和使用魔法值 1s。
var testScheduler = new TestScheduler();
var source = testScheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(TimeSpan.FromSeconds(0.1).Ticks, "one"),
ReactiveTest.OnCompleted<string>(TimeSpan.FromSeconds(0.2).Ticks));
var subscription = source.SubscribeOn(testScheduler)
.Subscribe(
item => Console.WriteLine(item),
error => Console.WriteLine(error),
() => Console.WriteLine("Complete!")
);
testScheduler.Start();
Console.WriteLine("DONE.");
Console.ReadLine();
现在唯一的问题是 SubscribeOn
调用非常多余。
仅供参考:NewThreadScheduler
的代码 -
https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/NewThreadScheduler.cs