为什么重复 Enumerable 到 Observable 转换块
Why does repeated Enumerable to Observable conversion block
这是一个很有教育意义的问题,出于好奇。考虑以下片段:
var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();
其中 SubscribeDebug
订阅了一个简单的观察者:
public class DebugObserver<T> : IObserver<T>
{
public void OnCompleted()
{
Debug.WriteLine("Completed");
}
public void OnError(Exception error)
{
Debug.WriteLine("Error");
}
public void OnNext(T value)
{
Debug.WriteLine("Value: {0}", value);
}
}
这个输出是:
Value: 0
Value: 1
Value: 2
Value: 3
Value: 4
然后阻塞。有人可以帮助我理解它发生的根本原因以及可观察对象未完成的原因吗?我注意到它在没有 Concat
调用的情况下确实完成了,但是被它阻塞了。
我查看了 ToObservable
的 the source 并提取了一个最小的实现。它确实重现了我们所看到的行为。
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
{
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
{
if (enumerator.MoveNext())
{
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec); //<-- culprit
}
else
{
observer.OnCompleted();
}
// ToObservable.cs Line 117
// We never allow the scheduled work to be cancelled.
return Disposable.Empty;
}
return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
}
);
除此之外 - 问题的症结在于 CurrentThreadScheduler
的行为,这是使用的默认调度程序。
CurrentThreadScheduler
的行为是,如果调度 已经 运行 而 Schedule
正在被调用 - 它最终会被排队.
CurrentThreadScheduler.Instance.Schedule(() =>
{
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);
Console.WriteLine(2);
});
这会打印 2 1
。这种排队行为是我们的毁灭。
当调用 observer.OnCompleted()
时,它会导致 Concat
开始下一个枚举 - 然而,情况与我们开始时不一样 - 我们仍在 observer => { }
中当我们尝试安排下一个时阻塞。因此,下一个计划不是立即执行,而是排队。
现在 enumerator.MoveNext()
陷入了僵局。
它无法移动到下一个项目 - MoveNext
在下一个项目 到达 之前阻塞 - 它只能在 ToObservable
循环安排时到达。
但是调度程序只能通知 ToEnumerable
和随后被阻止的 MoveNext()
- 一旦它退出 loopRec
- 它不能因为它被阻止MoveNext
第一名。
附录
这大约是 ToEnumerable
(来自 GetEnumerator.cs)所做的(不是有效的实现):
public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
{
var gate = new SemaphoreSlim(0);
var queue = new ConcurrentQueue<T>();
using(observable.Subscribe(
value => { queue.Enqueue(value); gate.Release(); },
() => gate.Release()))
while (true)
{
gate.Wait(); //this is where it blocks
if (queue.TryDequeue(out var current))
yield return current;
else
break;
}
}
Enumerables 预计会阻塞,直到产生下一个项目 - 这就是为什么有门控实现的原因。阻塞的不是 Enumerable.Range
,而是 ToEnumerable
.
这是一个很有教育意义的问题,出于好奇。考虑以下片段:
var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();
其中 SubscribeDebug
订阅了一个简单的观察者:
public class DebugObserver<T> : IObserver<T>
{
public void OnCompleted()
{
Debug.WriteLine("Completed");
}
public void OnError(Exception error)
{
Debug.WriteLine("Error");
}
public void OnNext(T value)
{
Debug.WriteLine("Value: {0}", value);
}
}
这个输出是:
Value: 0
Value: 1
Value: 2
Value: 3
Value: 4
然后阻塞。有人可以帮助我理解它发生的根本原因以及可观察对象未完成的原因吗?我注意到它在没有 Concat
调用的情况下确实完成了,但是被它阻塞了。
我查看了 ToObservable
的 the source 并提取了一个最小的实现。它确实重现了我们所看到的行为。
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
{
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
{
if (enumerator.MoveNext())
{
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec); //<-- culprit
}
else
{
observer.OnCompleted();
}
// ToObservable.cs Line 117
// We never allow the scheduled work to be cancelled.
return Disposable.Empty;
}
return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
}
);
除此之外 - 问题的症结在于 CurrentThreadScheduler
的行为,这是使用的默认调度程序。
CurrentThreadScheduler
的行为是,如果调度 已经 运行 而 Schedule
正在被调用 - 它最终会被排队.
CurrentThreadScheduler.Instance.Schedule(() =>
{
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);
Console.WriteLine(2);
});
这会打印 2 1
。这种排队行为是我们的毁灭。
当调用 observer.OnCompleted()
时,它会导致 Concat
开始下一个枚举 - 然而,情况与我们开始时不一样 - 我们仍在 observer => { }
中当我们尝试安排下一个时阻塞。因此,下一个计划不是立即执行,而是排队。
现在 enumerator.MoveNext()
陷入了僵局。
它无法移动到下一个项目 - MoveNext
在下一个项目 到达 之前阻塞 - 它只能在 ToObservable
循环安排时到达。
但是调度程序只能通知 ToEnumerable
和随后被阻止的 MoveNext()
- 一旦它退出 loopRec
- 它不能因为它被阻止MoveNext
第一名。
附录
这大约是 ToEnumerable
(来自 GetEnumerator.cs)所做的(不是有效的实现):
public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
{
var gate = new SemaphoreSlim(0);
var queue = new ConcurrentQueue<T>();
using(observable.Subscribe(
value => { queue.Enqueue(value); gate.Release(); },
() => gate.Release()))
while (true)
{
gate.Wait(); //this is where it blocks
if (queue.TryDequeue(out var current))
yield return current;
else
break;
}
}
Enumerables 预计会阻塞,直到产生下一个项目 - 这就是为什么有门控实现的原因。阻塞的不是 Enumerable.Range
,而是 ToEnumerable
.