Rxjs 可观察到承诺从未在 nodejs 中解决

Rxjs observable to promise never resolved in nodejs

我已经对我正在使用的代码做了一个简化示例。

import * as rx from "rxjs";
import * as op from "rxjs/operators";

async function main(): Promise<void> {
    const blocker = new rx.ReplaySubject<0>();
    const subscription = rx.timer(0, 1000)
        .pipe(
            op.take(3),
            op.observeOn(rx.queueScheduler),
            op.subscribeOn(rx.queueScheduler)
        )
        .subscribe({
            next: x => console.log(`timer: next: [${x}]`),
            error: err => console.log(`timer: error: [${err}]`),
            complete: () => {
                console.log("timer: complete");
                blocker.next(0);
            }
        });
    const promise = rx.lastValueFrom(blocker.asObservable()
        .pipe(
            op.single(),
            op.observeOn(rx.queueScheduler),
            op.subscribeOn(rx.queueScheduler)
        ));
    console.log("prepared to await");
    await promise;
    console.log("awaited!");
    subscription.unsubscribe();
}

main()
    .then(
        () => console.log("all right"),
        reason => console.log(`rejected: [${reason}]`))
    .catch(err => console.log(`error! : ${err}`))
    .finally(() => console.log("done done done"));

除了“等待!”的部分,它可以正常工作(有点)。永远不会打印到控制台,以及 main 函数返回的承诺之后的任何行。

实际控制台输出为:

prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete

我期待的是:

prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
awaited!
all right

问题:

  1. 为什么会这样?这里涉及的 nodejs 'magic'(我假设有调度程序)是什么?能推荐一下这里涉及到的nodejs内部的书籍吗?
  2. 如何更改代码以达到预期的输出?

谢谢。

我不认为这是关于调度程序或 nodejs 的。 lastValueFrom 要求传递给它的可观察对象在解析返回的承诺之前完成。在这种情况下,它永远不会完成。

一个解决方案

试试这个:

blocker.next(0);
blocker.complete();

然后您应该得到预期的输出。否则 blocker 永远不会完成,这意味着它永远不会发出最后一个值。

另一个解决方案

替换

single()

take(1)

然后即使 blocker 本身没有完成,您交给 lastValueFrom 的可观察对象也会完成。这将解决你的承诺。

另一种解决方案

试试这个:

blocker.next(0);
blocker.next(1);

然后您应该得到预期的输出。虽然这次将是 singleSequenceError 解析您的承诺(而不是之前的示例,其中承诺以值 0 解析)。