Rxjs 可观察到承诺从未在 nodejs 中解决
Rxjs observable to promise never resolved in nodejs
我已经对我正在使用的代码做了一个简化示例。
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function main(): Promise<void> {
const blocker = new rx.ReplaySubject<0>();
const subscription = rx.timer(0, 1000)
.pipe(
op.take(3),
op.observeOn(rx.queueScheduler),
op.subscribeOn(rx.queueScheduler)
)
.subscribe({
next: x => console.log(`timer: next: [${x}]`),
error: err => console.log(`timer: error: [${err}]`),
complete: () => {
console.log("timer: complete");
blocker.next(0);
}
});
const promise = rx.lastValueFrom(blocker.asObservable()
.pipe(
op.single(),
op.observeOn(rx.queueScheduler),
op.subscribeOn(rx.queueScheduler)
));
console.log("prepared to await");
await promise;
console.log("awaited!");
subscription.unsubscribe();
}
main()
.then(
() => console.log("all right"),
reason => console.log(`rejected: [${reason}]`))
.catch(err => console.log(`error! : ${err}`))
.finally(() => console.log("done done done"));
除了“等待!”的部分,它可以正常工作(有点)。永远不会打印到控制台,以及 main
函数返回的承诺之后的任何行。
实际控制台输出为:
prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
我期待的是:
prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
awaited!
all right
问题:
- 为什么会这样?这里涉及的 nodejs 'magic'(我假设有调度程序)是什么?能推荐一下这里涉及到的nodejs内部的书籍吗?
- 如何更改代码以达到预期的输出?
谢谢。
我不认为这是关于调度程序或 nodejs 的。 lastValueFrom
要求传递给它的可观察对象在解析返回的承诺之前完成。在这种情况下,它永远不会完成。
一个解决方案
试试这个:
blocker.next(0);
blocker.complete();
然后您应该得到预期的输出。否则 blocker 永远不会完成,这意味着它永远不会发出最后一个值。
另一个解决方案
替换
single()
和
take(1)
然后即使 blocker 本身没有完成,您交给 lastValueFrom
的可观察对象也会完成。这将解决你的承诺。
另一种解决方案
试试这个:
blocker.next(0);
blocker.next(1);
然后您应该得到预期的输出。虽然这次将是 single
以 SequenceError
解析您的承诺(而不是之前的示例,其中承诺以值 0
解析)。
我已经对我正在使用的代码做了一个简化示例。
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function main(): Promise<void> {
const blocker = new rx.ReplaySubject<0>();
const subscription = rx.timer(0, 1000)
.pipe(
op.take(3),
op.observeOn(rx.queueScheduler),
op.subscribeOn(rx.queueScheduler)
)
.subscribe({
next: x => console.log(`timer: next: [${x}]`),
error: err => console.log(`timer: error: [${err}]`),
complete: () => {
console.log("timer: complete");
blocker.next(0);
}
});
const promise = rx.lastValueFrom(blocker.asObservable()
.pipe(
op.single(),
op.observeOn(rx.queueScheduler),
op.subscribeOn(rx.queueScheduler)
));
console.log("prepared to await");
await promise;
console.log("awaited!");
subscription.unsubscribe();
}
main()
.then(
() => console.log("all right"),
reason => console.log(`rejected: [${reason}]`))
.catch(err => console.log(`error! : ${err}`))
.finally(() => console.log("done done done"));
除了“等待!”的部分,它可以正常工作(有点)。永远不会打印到控制台,以及 main
函数返回的承诺之后的任何行。
实际控制台输出为:
prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
我期待的是:
prepared to await
timer: next: [0]
timer: next: [1]
timer: next: [2]
timer: complete
awaited!
all right
问题:
- 为什么会这样?这里涉及的 nodejs 'magic'(我假设有调度程序)是什么?能推荐一下这里涉及到的nodejs内部的书籍吗?
- 如何更改代码以达到预期的输出?
谢谢。
我不认为这是关于调度程序或 nodejs 的。 lastValueFrom
要求传递给它的可观察对象在解析返回的承诺之前完成。在这种情况下,它永远不会完成。
一个解决方案
试试这个:
blocker.next(0);
blocker.complete();
然后您应该得到预期的输出。否则 blocker 永远不会完成,这意味着它永远不会发出最后一个值。
另一个解决方案
替换
single()
和
take(1)
然后即使 blocker 本身没有完成,您交给 lastValueFrom
的可观察对象也会完成。这将解决你的承诺。
另一种解决方案
试试这个:
blocker.next(0);
blocker.next(1);
然后您应该得到预期的输出。虽然这次将是 single
以 SequenceError
解析您的承诺(而不是之前的示例,其中承诺以值 0
解析)。