与多个 IEnumerables 一起使用时 Observable.Subscribe 的行为
Behavior of Observable.Subscribe when used with multiple IEnumerables
我正在尝试从两个数组 (IEnumerable
s) 创建一个 IObservable<T>
。我试图避免显式迭代数组并调用 observer.OnNext
。我遇到了 Observable.Subscribe 扩展方法,乍一看似乎是我需要的。然而,它并没有像我预期的那样工作,我不知道为什么。
以下代码为例:
class Program
{
static void Main(string[] args)
{
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] {'A'};
var secondBytes = new[] {'Z', 'Y'};
firstBytes.Subscribe(observer);
secondBytes.Subscribe(observer);
return Disposable.Empty;
}
);
observable.Subscribe(b => Console.Write(b));
}
}
它的输出是 "AZ",而不是我预期的 "AZY"。现在,如果我在 firstBytes
之前订阅 secondBytes
,输出是 "ZAY"!这似乎表明它正在同步枚举两个数组——这解释了 "AZ" 输出。
无论如何,我完全不知道它为什么会这样,希望能提供任何见解。
因为您订阅了两个 observables,而不是将两个observables串联的单个observable,所以有两个可能的来源可以调用观察者的OnComplete
方法。由于第一个数组较短,它在发出第一个项目后完成,并且观察者在收到完成通知后取消订阅。
正确的做法是将两个序列组合成一个序列,然后订阅它:
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] { 'A' };
var secondBytes = new[] { 'Z', 'Y' };
return firstBytes.Concat(secondBytes).Subscribe(observer);
});
observable.Subscribe(Console.Write);
锁步迭代行为的原因可以通过 Observable.Subscribe(IEnumerable source) which uses a "recursive" algorithm 的实现来解释,它通过在调度程序操作中调用 e.MoveNext
来工作。如果成功,则发出该值,然后新调度程序操作排队以从可枚举中读取下一个值。
由于您订阅了两个可枚举对象并且没有为订阅指定任何特定的调度程序,因此默认迭代调度程序将用于这些操作(由 SchedulerDefaults.Iteration
定义),默认为 运行在当前线程上运行。这意味着在您当前的订阅操作完成后,枚举操作将排队到 运行。这导致枚举操作交错 - 像这样
- firstBytes.Subscribe() -> 队列枚举操作
- secondBytes.Subscribe() -> 队列枚举操作
- 调用firstBytes.MoveNext() -> OnNext("A") -> 排队下一个枚举动作
- 调用secondBytes.MoveNext() -> OnNext("Z") -> 排队下一个枚举动作
- 调用 firstBytes.MoveNext() -> OnCompleted()
- 调用secondBytes.MoveNext() -> OnNext(Y) -> 排队下一个枚举动作
- 调用 secondBytes.MoveNext() -> OnCompleted()
观察者在第 5 步收到 OnCompleted() 通知,因此忽略剩余的 secondBytes 枚举步骤。如果您退回了您的订阅一次性用品,那么此时将取消第二次枚举。
我正在尝试从两个数组 (IEnumerable
s) 创建一个 IObservable<T>
。我试图避免显式迭代数组并调用 observer.OnNext
。我遇到了 Observable.Subscribe 扩展方法,乍一看似乎是我需要的。然而,它并没有像我预期的那样工作,我不知道为什么。
以下代码为例:
class Program
{
static void Main(string[] args)
{
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] {'A'};
var secondBytes = new[] {'Z', 'Y'};
firstBytes.Subscribe(observer);
secondBytes.Subscribe(observer);
return Disposable.Empty;
}
);
observable.Subscribe(b => Console.Write(b));
}
}
它的输出是 "AZ",而不是我预期的 "AZY"。现在,如果我在 firstBytes
之前订阅 secondBytes
,输出是 "ZAY"!这似乎表明它正在同步枚举两个数组——这解释了 "AZ" 输出。
无论如何,我完全不知道它为什么会这样,希望能提供任何见解。
因为您订阅了两个 observables,而不是将两个observables串联的单个observable,所以有两个可能的来源可以调用观察者的OnComplete
方法。由于第一个数组较短,它在发出第一个项目后完成,并且观察者在收到完成通知后取消订阅。
正确的做法是将两个序列组合成一个序列,然后订阅它:
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] { 'A' };
var secondBytes = new[] { 'Z', 'Y' };
return firstBytes.Concat(secondBytes).Subscribe(observer);
});
observable.Subscribe(Console.Write);
锁步迭代行为的原因可以通过 Observable.Subscribe(IEnumerable source) which uses a "recursive" algorithm 的实现来解释,它通过在调度程序操作中调用 e.MoveNext
来工作。如果成功,则发出该值,然后新调度程序操作排队以从可枚举中读取下一个值。
由于您订阅了两个可枚举对象并且没有为订阅指定任何特定的调度程序,因此默认迭代调度程序将用于这些操作(由 SchedulerDefaults.Iteration
定义),默认为 运行在当前线程上运行。这意味着在您当前的订阅操作完成后,枚举操作将排队到 运行。这导致枚举操作交错 - 像这样
- firstBytes.Subscribe() -> 队列枚举操作
- secondBytes.Subscribe() -> 队列枚举操作
- 调用firstBytes.MoveNext() -> OnNext("A") -> 排队下一个枚举动作
- 调用secondBytes.MoveNext() -> OnNext("Z") -> 排队下一个枚举动作
- 调用 firstBytes.MoveNext() -> OnCompleted()
- 调用secondBytes.MoveNext() -> OnNext(Y) -> 排队下一个枚举动作
- 调用 secondBytes.MoveNext() -> OnCompleted()
观察者在第 5 步收到 OnCompleted() 通知,因此忽略剩余的 secondBytes 枚举步骤。如果您退回了您的订阅一次性用品,那么此时将取消第二次枚举。