ReactiveX onComplete 在最后一个 onNotify 之前触发
ReactiveX onComplete triggered before the last onNotify
考虑以下管道:
- 将物品缓冲到包中
- 在线程池线程中观察这些包
- 对这些包做一些异步处理
如果进程已完成,将源可观察对象设置为 complete 将导致缓冲区按原样发出当前包。然而,处理部分将在获取最后一个包之前获取完整事件。
我的想法是等待最后一个包被处理但是因为我在 OnNext 之前得到了 OnComplete 我不能似乎是通过 ReactiveX 机制来实现的。
有什么方法可以让 OnComplete 在最后一次 OnNext 之后 发生?
这是模拟此行为的示例 (available as a .NET Fiddle):
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
publications
.Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
.Subscribe()
;
obs.OnNext(1);
obs.OnNext(2);
obs.OnNext(3);
var nextpub = publications.FirstAsync();
obs.OnCompleted();
nextpub.Wait();
这段代码解码起来有点乱...我认为您只是订阅了两次 publications
observable:
- 一旦您明确调用
Subscribe()
- 一旦您通过
FirstAsync()
隐式调用它
如果您重新排列如下。将您第一次订阅出版物的行替换为:
var tcs = new TaskCompletionSource<Unit>();
var nextpub = publications
.Do(x => Console.WriteLine("publications notify {0}", x),
() => Console.WriteLine("publications complete"))
.Subscribe(_ => {}, () => tcs.SetResult(Unit.Default));
删除带有 FirstAsync()
的行并将对 nextpub.Wait()
的调用替换为:
tcs.Task.Wait();
这不是编写 Rx 代码的推荐方式 - 只是修复您拥有的代码的最快方法。您通常应该在订阅者中处理您的结果,而不是阻止完成。例如:
SomeObservable.Subscribe(x => /* handle result */);
在 Rx 中,了解可观察对象的行为契约很重要。您将始终从内置运算符中获取此序列:
OnNext*(OnCompleted|OnError)?
因此,零个或多个(可能是无限个)"OnNext" 次调用后,可选地,"OnCompleted" 或 "OnError" 次调用。
你将 永远 在 "OnNext" 之前获得 "OnCompleted" - 对于单个可观察值。
现在,您的代码似乎 表现不同 - 但事实并非如此。
您实际上有两个独立的源可观察订阅。
这是一个订阅的样子:
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
publications
.Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
.Subscribe(); /* Subscription #1 here! */
obs.OnNext(1);
obs.OnNext(2);
obs.OnNext(3);
obs.OnCompleted();
这是另一个:
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
var nextpub = publications.FirstAsync();
obs.OnCompleted();
nextpub.Wait(); /* Subscription #2 here! */
如果我们只查看 "Subscription #1" 并将调度程序更改为 Scheduler.Immediate
,我们将获得以下执行顺序:
to buffer 1
to buffer 2
from buffer [1, 2]
to selectmany [1, 2]
> thread: 28
notify ()
publications notify ()
to buffer 3
to buffer complete
from buffer [3]
to selectmany [3]
> thread: 28
notify ()
publications notify ()
from buffer complete
to selectmany complete
complete
publications complete
这看起来仍然像我们在值出来之前得到 to buffer complete
。但这是误导。 .Do(...)
运算符专门用于允许将副作用引入 Rx 管道。所以它可能会让事情看起来没有按顺序发生,但是如果你在你创建的 Rx 管道中执行每一步,你会发现每一步都完美地遵循 "OnNext*(OnCompleted|OnError)" 契约。
您确实需要专注于管道中的单个步骤,您会发现一切都正确进行。
考虑以下管道:
- 将物品缓冲到包中
- 在线程池线程中观察这些包
- 对这些包做一些异步处理
如果进程已完成,将源可观察对象设置为 complete 将导致缓冲区按原样发出当前包。然而,处理部分将在获取最后一个包之前获取完整事件。
我的想法是等待最后一个包被处理但是因为我在 OnNext 之前得到了 OnComplete 我不能似乎是通过 ReactiveX 机制来实现的。
有什么方法可以让 OnComplete 在最后一次 OnNext 之后 发生?
这是模拟此行为的示例 (available as a .NET Fiddle):
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
publications
.Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
.Subscribe()
;
obs.OnNext(1);
obs.OnNext(2);
obs.OnNext(3);
var nextpub = publications.FirstAsync();
obs.OnCompleted();
nextpub.Wait();
这段代码解码起来有点乱...我认为您只是订阅了两次 publications
observable:
- 一旦您明确调用
Subscribe()
- 一旦您通过
FirstAsync()
隐式调用它
如果您重新排列如下。将您第一次订阅出版物的行替换为:
var tcs = new TaskCompletionSource<Unit>();
var nextpub = publications
.Do(x => Console.WriteLine("publications notify {0}", x),
() => Console.WriteLine("publications complete"))
.Subscribe(_ => {}, () => tcs.SetResult(Unit.Default));
删除带有 FirstAsync()
的行并将对 nextpub.Wait()
的调用替换为:
tcs.Task.Wait();
这不是编写 Rx 代码的推荐方式 - 只是修复您拥有的代码的最快方法。您通常应该在订阅者中处理您的结果,而不是阻止完成。例如:
SomeObservable.Subscribe(x => /* handle result */);
在 Rx 中,了解可观察对象的行为契约很重要。您将始终从内置运算符中获取此序列:
OnNext*(OnCompleted|OnError)?
因此,零个或多个(可能是无限个)"OnNext" 次调用后,可选地,"OnCompleted" 或 "OnError" 次调用。
你将 永远 在 "OnNext" 之前获得 "OnCompleted" - 对于单个可观察值。
现在,您的代码似乎 表现不同 - 但事实并非如此。
您实际上有两个独立的源可观察订阅。
这是一个订阅的样子:
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
publications
.Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
.Subscribe(); /* Subscription #1 here! */
obs.OnNext(1);
obs.OnNext(2);
obs.OnNext(3);
obs.OnCompleted();
这是另一个:
var publications =
obs
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Buffer(2)
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
var nextpub = publications.FirstAsync();
obs.OnCompleted();
nextpub.Wait(); /* Subscription #2 here! */
如果我们只查看 "Subscription #1" 并将调度程序更改为 Scheduler.Immediate
,我们将获得以下执行顺序:
to buffer 1
to buffer 2
from buffer [1, 2]
to selectmany [1, 2]
> thread: 28
notify ()
publications notify ()
to buffer 3
to buffer complete
from buffer [3]
to selectmany [3]
> thread: 28
notify ()
publications notify ()
from buffer complete
to selectmany complete
complete
publications complete
这看起来仍然像我们在值出来之前得到 to buffer complete
。但这是误导。 .Do(...)
运算符专门用于允许将副作用引入 Rx 管道。所以它可能会让事情看起来没有按顺序发生,但是如果你在你创建的 Rx 管道中执行每一步,你会发现每一步都完美地遵循 "OnNext*(OnCompleted|OnError)" 契约。
您确实需要专注于管道中的单个步骤,您会发现一切都正确进行。