ReactiveX onComplete 在最后一个 onNotify 之前触发

ReactiveX onComplete triggered before the last onNotify

考虑以下管道:

  1. 将物品缓冲到包中
  2. 在线程池线程中观察这些包
  3. 对这些包做一些异步处理

如果进程已完成,将源可观察对象设置为 complete 将导致缓冲区按原样发出当前包。然而,处理部分将在获取最后一个包之前获取完整事件。

我的想法是等待最后一个包被处理但是因为我在 OnNext 之前得到了 OnComplete 我不能似乎是通过 ReactiveX 机制来实现的。

有什么方法可以让 OnComplete 在最后一次 OnNext 之后 发生?

这是模拟此行为的示例 (available as a .NET Fiddle):

    var publications = 
        obs
        .Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
        .Buffer(2)
        .Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
        .ObserveOn(ThreadPoolScheduler.Instance)
        .Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
        .SelectMany(x => Test(x).ToEnumerable())
        .Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));

    publications
        .Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
        .Subscribe()
        ;

    obs.OnNext(1);
    obs.OnNext(2);
    obs.OnNext(3);

    var nextpub = publications.FirstAsync();

    obs.OnCompleted();

    nextpub.Wait();

这段代码解码起来有点乱...我认为您只是订阅了两次 publications observable:

  • 一旦您明确调用 Subscribe()
  • 一旦您通过 FirstAsync()
  • 隐式调用它

如果您重新排列如下。将您第一次订阅出版物的行替换为:

var tcs = new TaskCompletionSource<Unit>();

var nextpub = publications
    .Do(x => Console.WriteLine("publications notify {0}", x),
        () => Console.WriteLine("publications complete"))
    .Subscribe(_ => {}, () => tcs.SetResult(Unit.Default));

删除带有 FirstAsync() 的行并将对 nextpub.Wait() 的调用替换为:

tcs.Task.Wait();

这不是编写 Rx 代码的推荐方式 - 只是修复您拥有的代码的最快方法。您通常应该在订阅者中处理您的结果,而不是阻止完成。例如:

SomeObservable.Subscribe(x => /* handle result */);

在 Rx 中,了解可观察对象的行为契约很重要。您将始终从内置运算符中获取此序列:

OnNext*(OnCompleted|OnError)?

因此,零个或多个(可能是无限个)"OnNext" 次调用后,可选地,"OnCompleted" 或 "OnError" 次调用。

你将 永远 在 "OnNext" 之前获得 "OnCompleted" - 对于单个可观察值。

现在,您的代码似乎 表现不同 - 但事实并非如此。

您实际上有两个独立的源可观察订阅。

这是一个订阅的样子:

    var publications = 
        obs
        .Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
        .Buffer(2)
        .Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
        .ObserveOn(ThreadPoolScheduler.Instance)
        .Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
        .SelectMany(x => Test(x).ToEnumerable())
        .Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));

    publications
        .Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
        .Subscribe(); /* Subscription #1 here! */

    obs.OnNext(1);
    obs.OnNext(2);
    obs.OnNext(3);

    obs.OnCompleted();

这是另一个:

    var publications = 
        obs
        .Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
        .Buffer(2)
        .Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
        .ObserveOn(ThreadPoolScheduler.Instance)
        .Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
        .SelectMany(x => Test(x).ToEnumerable())
        .Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));

    var nextpub = publications.FirstAsync();

    obs.OnCompleted();

    nextpub.Wait(); /* Subscription #2 here! */

如果我们只查看 "Subscription #1" 并将调度程序更改为 Scheduler.Immediate,我们将获得以下执行顺序:

to buffer 1
to buffer 2
from buffer [1, 2]
to selectmany [1, 2]
> thread: 28
notify ()
publications notify ()
to buffer 3
to buffer complete
from buffer [3]
to selectmany [3]
> thread: 28
notify ()
publications notify ()
from buffer complete
to selectmany complete
complete
publications complete

这看起来仍然像我们在值出来之前得到 to buffer complete。但这是误导。 .Do(...) 运算符专门用于允许将副作用引入 Rx 管道。所以它可能会让事情看起来没有按顺序发生,但是如果你在你创建的 Rx 管道中执行每一步,你会发现每一步都完美地遵循 "OnNext*(OnCompleted|OnError)" 契约。

您确实需要专注于管道中的单个步骤,您会发现一切都正确进行。