call-order 跨多个 Rx Subject 是否保证了顺序?
Is ordering guaranteed by call-order cross multiple Rx Subjects?
在我下面的示例中,我有 2 个主题,并且 _primarySubject 总是在 _secondarySubject 之前被调用是否保证订阅者在辅助回调之前接收到主要回调?测试简单的测试似乎说是的还是侥幸?
如果这是侥幸,我该如何更改代码以保证我在此处描述的顺序。
非常感谢。
public class EventGenerator
{
ISubject<string> _primarySubject = new Subject<string>();
ISubject<int> _secondarySubject = new Subject<int>();
private int i;
public void SendEvent(string message)
{
_primarySubject.OnNext(message);
_secondarySubject.OnNext(i++);
}
public IObservable<string> GetPrimaryStream()
{
return _primarySubject;
}
public IObservable<int> GetSecondaryStream()
{
return _secondarySubject;
}
}
public class EventSubscriber
{
private static IScheduler StaticFakeGUIThread = new EventLoopScheduler(x => new Thread(x) { Name = "GUIThread" });
private readonly int _id;
public EventSubscriber(int id, EventGenerator eventGenerator)
{
_id = id;
eventGenerator.GetPrimaryStream().ObserveOn(StaticFakeGUIThread).Subscribe(OnPrimaryEvent);
eventGenerator.GetSecondaryStream().ObserveOn(StaticFakeGUIThread).Subscribe(OnSecondaryEvent);
}
private void OnPrimaryEvent(String eventMsg)
{
string msg = eventMsg;
}
private void OnSecondaryEvent(int i)
{
int msg = i;
}
}
[TestFixture]
public class EventGeneratorTest
{
[Test]
public void Test()
{
EventGenerator eg = new EventGenerator();
EventSubscriber s1 = new EventSubscriber(1,eg);
EventSubscriber s2 = new EventSubscriber(2, eg);
eg.SendEvent("A");
eg.SendEvent("B");
}
}
一般来说,Rx 不会对不同可观察流的相对顺序做出很多保证。订购取决于 RX 无法控制的许多因素。
在您的具体情况下,顺序是有保证的:
- 由于您按顺序同步触发您的主题,因此可以保证按顺序触发他们的即时订阅(
.ObserveOn(StaticFakeUIThread)
)。
- 由于您对
ObserveOn
的两次调用都使用相同的调度程序实例,因此可以保证对主流事件的观察将在对次要流事件的观察之前进行安排,并且两个观察都将被安排使用相同的调度程序
- 由于您的调度程序是
EventLoopScheduler
,因此可以保证它不会同时 运行 计划的任务,而是按计划的顺序 运行 它们.
因此您的观察确实运行有序。
将 EventLoopScheduler
替换为 TaskPoolScheduler
、ThreadPoolScheduler
或 Scheduler.Default
,这两个流的相对顺序将不再有任何保证,因为这些调度程序是允许 运行 同时执行两个计划任务。
这完全取决于您所说的 "guaranteed" 是什么意思。 Rx 的当前实现和编写的代码是这样的,调用顺序将始终如您所观察到的那样——但是,没有发布的规则来管理该行为和保证它。
您的代码编写方式没有提供使用两个主题的特别优势 - 或者实际上是两个订阅,因为处理程序是在同一个线程上调用的。您只需使用结合事件的单个主题即可获得所需的保证 - 为此使用自定义类型。为了方便起见,我使用了一个元组:
ISubject<Tuple<string,int>> _subject = new Subject<Tuple<string,int>>();
private int i;
public void SendEvent(string message)
{
_subject.OnNext(Tuple.Create(message,i++));
}
以及您的订阅:
private void OnEvent(Tuple<String,int> eventMsg)
{
string msgText = eventMsg.Item1;
int msgNum = eventMsg.Item2;
}
无论 Scheduler 等如何,这都会为您提供保证。我怀疑您的问题一定有更令人信服的理由 - 但否则效率更高。
如果您只是想使用一个计数器顺便说一句,Select
的重载为您引入了一个基于 0 的事件索引计数器。
理论上一切都应该按顺序进行。实际上,EventLoopScheduler 中存在一个错误,如果队列饥饿,该错误会破坏顺序。
在此处查看问题
https://github.com/Reactive-Extensions/Rx.NET/issues/455
在我下面的示例中,我有 2 个主题,并且 _primarySubject 总是在 _secondarySubject 之前被调用是否保证订阅者在辅助回调之前接收到主要回调?测试简单的测试似乎说是的还是侥幸?
如果这是侥幸,我该如何更改代码以保证我在此处描述的顺序。
非常感谢。
public class EventGenerator
{
ISubject<string> _primarySubject = new Subject<string>();
ISubject<int> _secondarySubject = new Subject<int>();
private int i;
public void SendEvent(string message)
{
_primarySubject.OnNext(message);
_secondarySubject.OnNext(i++);
}
public IObservable<string> GetPrimaryStream()
{
return _primarySubject;
}
public IObservable<int> GetSecondaryStream()
{
return _secondarySubject;
}
}
public class EventSubscriber
{
private static IScheduler StaticFakeGUIThread = new EventLoopScheduler(x => new Thread(x) { Name = "GUIThread" });
private readonly int _id;
public EventSubscriber(int id, EventGenerator eventGenerator)
{
_id = id;
eventGenerator.GetPrimaryStream().ObserveOn(StaticFakeGUIThread).Subscribe(OnPrimaryEvent);
eventGenerator.GetSecondaryStream().ObserveOn(StaticFakeGUIThread).Subscribe(OnSecondaryEvent);
}
private void OnPrimaryEvent(String eventMsg)
{
string msg = eventMsg;
}
private void OnSecondaryEvent(int i)
{
int msg = i;
}
}
[TestFixture]
public class EventGeneratorTest
{
[Test]
public void Test()
{
EventGenerator eg = new EventGenerator();
EventSubscriber s1 = new EventSubscriber(1,eg);
EventSubscriber s2 = new EventSubscriber(2, eg);
eg.SendEvent("A");
eg.SendEvent("B");
}
}
一般来说,Rx 不会对不同可观察流的相对顺序做出很多保证。订购取决于 RX 无法控制的许多因素。
在您的具体情况下,顺序是有保证的:
- 由于您按顺序同步触发您的主题,因此可以保证按顺序触发他们的即时订阅(
.ObserveOn(StaticFakeUIThread)
)。 - 由于您对
ObserveOn
的两次调用都使用相同的调度程序实例,因此可以保证对主流事件的观察将在对次要流事件的观察之前进行安排,并且两个观察都将被安排使用相同的调度程序 - 由于您的调度程序是
EventLoopScheduler
,因此可以保证它不会同时 运行 计划的任务,而是按计划的顺序 运行 它们.
因此您的观察确实运行有序。
将 EventLoopScheduler
替换为 TaskPoolScheduler
、ThreadPoolScheduler
或 Scheduler.Default
,这两个流的相对顺序将不再有任何保证,因为这些调度程序是允许 运行 同时执行两个计划任务。
这完全取决于您所说的 "guaranteed" 是什么意思。 Rx 的当前实现和编写的代码是这样的,调用顺序将始终如您所观察到的那样——但是,没有发布的规则来管理该行为和保证它。
您的代码编写方式没有提供使用两个主题的特别优势 - 或者实际上是两个订阅,因为处理程序是在同一个线程上调用的。您只需使用结合事件的单个主题即可获得所需的保证 - 为此使用自定义类型。为了方便起见,我使用了一个元组:
ISubject<Tuple<string,int>> _subject = new Subject<Tuple<string,int>>();
private int i;
public void SendEvent(string message)
{
_subject.OnNext(Tuple.Create(message,i++));
}
以及您的订阅:
private void OnEvent(Tuple<String,int> eventMsg)
{
string msgText = eventMsg.Item1;
int msgNum = eventMsg.Item2;
}
无论 Scheduler 等如何,这都会为您提供保证。我怀疑您的问题一定有更令人信服的理由 - 但否则效率更高。
如果您只是想使用一个计数器顺便说一句,Select
的重载为您引入了一个基于 0 的事件索引计数器。
理论上一切都应该按顺序进行。实际上,EventLoopScheduler 中存在一个错误,如果队列饥饿,该错误会破坏顺序。
在此处查看问题 https://github.com/Reactive-Extensions/Rx.NET/issues/455