承诺到可观察的节点js循环
Node js loop of promises to observable
我有一种方法可以对数据库进行大量异步查询以检索大量数据。为简单起见,假设每个查询 returns 一个整数数组。我想把这个方法变成可观察的,一个一个地输出数字。这部分工作正常。
问题始于 'take' 运算符 - 如果没有人在收听结果,我想停止数据库请求。我的问题是 'scroll' 函数在达到停止条件之前不会停止执行,即使 'largeQueryPromise' 由于 take(10) 运算符而不再收听它。
当订阅者由于各种原因取消订阅时,是否有可能停止 observable 的执行?
let ind = 0;
function dbRequest(): Promise<number[]> {
return new Promise(resolve => resolve([ind++, ind++]));
}
async function largeQuery(index: number) {
let res = await dbRequest();
return new Observable(observer => scroll(observer, res, index));
}
function scroll(observer: Subscriber<number>, res: number[], index: number) {
if (Math.round(Math.random() * 5) === 0) {
console.log(`completed sequence ${index}`);
observer.complete();
return;
}
res.forEach(value => observer.next(value));
dbRequest().then(arr => scroll(observer, arr, index));
}
async function largeQueryPromise(index: number) {
console.log(`started sequence ${index}`);
const obs = await largeQuery(index);
obs.pipe(take(10)).subscribe(
undefined,
console.error,
() => {
console.log(`stopped to listen for sequence ${index}`);
largeQueryPromise(++index).then();
});
}
largeQueryPromise(0).then();
Observer 有一个参数“closed”,表示该订阅者是否已取消订阅。知道这一点,解决方案就很简单了:
function scroll(observer: Subscriber<number>, res: number[], index: number) {
if (Math.round(Math.random() * 5) === 0) {
console.log(`completed sequence ${index}`);
observer.complete();
return;
}
for(let i=0; i<res.length && !observer.closed; i++)
observer.next(res[i]);
if(!observer.closed)
dbRequest().then(arr => scroll(observer, arr, index));
}
编辑:请注意,从技术上讲,您不需要在 for 循环中进行检查 - 所有 .next 都将是一个 noop。
您的 largeQuery 只能使用运算符来完成。当上一个请求发出时,使用 expand
递归调用 dbRequest()
。通过返回 EMPTY
结束这个递归。使用 concatAll
传播传入数组发射。
function largeQuery(index: number): Observable<number> {
console.log("largeQuery2 for", index);
return from(dbRequest()).pipe(
expand(res => {
if (Math.round(Math.random() * 5) === 0) {
console.log(`completed sequence ${index}`);
return EMPTY;
}
// The observable returned here gets subscribed to before the 'take' operator
// below ends the subscription. To prevent an additional call of 'dbRequest'
// at the end, the observable returned here has to be asynchronous.
// That's why 'timer' is used.
// If this doesn't turn out to be an issue for you, the line below could be
// replace with 'return defer(() => dbRequest())' or even 'return from(dbRequest())'
return timer(0).pipe(switchMap(() => dbRequest()));
}),
concatAll()
);
}
function recursiveLargeQuery(index: number) {
console.log(`started sequence ${index}`);
largeQuery(index).pipe(
take(10),
).subscribe(
v => console.log(v),
console.error,
() => {
console.log(`stopped to listen for sequence ${index}`);
if (index < 2) { // end the recursion at some point
recursiveLargeQuery(++index);
}
});
}
recursiveLargeQuery(0)
我有一种方法可以对数据库进行大量异步查询以检索大量数据。为简单起见,假设每个查询 returns 一个整数数组。我想把这个方法变成可观察的,一个一个地输出数字。这部分工作正常。
问题始于 'take' 运算符 - 如果没有人在收听结果,我想停止数据库请求。我的问题是 'scroll' 函数在达到停止条件之前不会停止执行,即使 'largeQueryPromise' 由于 take(10) 运算符而不再收听它。
当订阅者由于各种原因取消订阅时,是否有可能停止 observable 的执行?
let ind = 0;
function dbRequest(): Promise<number[]> {
return new Promise(resolve => resolve([ind++, ind++]));
}
async function largeQuery(index: number) {
let res = await dbRequest();
return new Observable(observer => scroll(observer, res, index));
}
function scroll(observer: Subscriber<number>, res: number[], index: number) {
if (Math.round(Math.random() * 5) === 0) {
console.log(`completed sequence ${index}`);
observer.complete();
return;
}
res.forEach(value => observer.next(value));
dbRequest().then(arr => scroll(observer, arr, index));
}
async function largeQueryPromise(index: number) {
console.log(`started sequence ${index}`);
const obs = await largeQuery(index);
obs.pipe(take(10)).subscribe(
undefined,
console.error,
() => {
console.log(`stopped to listen for sequence ${index}`);
largeQueryPromise(++index).then();
});
}
largeQueryPromise(0).then();
Observer 有一个参数“closed”,表示该订阅者是否已取消订阅。知道这一点,解决方案就很简单了:
function scroll(observer: Subscriber<number>, res: number[], index: number) {
if (Math.round(Math.random() * 5) === 0) {
console.log(`completed sequence ${index}`);
observer.complete();
return;
}
for(let i=0; i<res.length && !observer.closed; i++)
observer.next(res[i]);
if(!observer.closed)
dbRequest().then(arr => scroll(observer, arr, index));
}
编辑:请注意,从技术上讲,您不需要在 for 循环中进行检查 - 所有 .next 都将是一个 noop。
您的 largeQuery 只能使用运算符来完成。当上一个请求发出时,使用 expand
递归调用 dbRequest()
。通过返回 EMPTY
结束这个递归。使用 concatAll
传播传入数组发射。
function largeQuery(index: number): Observable<number> {
console.log("largeQuery2 for", index);
return from(dbRequest()).pipe(
expand(res => {
if (Math.round(Math.random() * 5) === 0) {
console.log(`completed sequence ${index}`);
return EMPTY;
}
// The observable returned here gets subscribed to before the 'take' operator
// below ends the subscription. To prevent an additional call of 'dbRequest'
// at the end, the observable returned here has to be asynchronous.
// That's why 'timer' is used.
// If this doesn't turn out to be an issue for you, the line below could be
// replace with 'return defer(() => dbRequest())' or even 'return from(dbRequest())'
return timer(0).pipe(switchMap(() => dbRequest()));
}),
concatAll()
);
}
function recursiveLargeQuery(index: number) {
console.log(`started sequence ${index}`);
largeQuery(index).pipe(
take(10),
).subscribe(
v => console.log(v),
console.error,
() => {
console.log(`stopped to listen for sequence ${index}`);
if (index < 2) { // end the recursion at some point
recursiveLargeQuery(++index);
}
});
}
recursiveLargeQuery(0)