承诺到可观察的节点js循环

Node js loop of promises to observable

我有一种方法可以对数据库进行大量异步查询以检索大量数据。为简单起见,假设每个查询 returns 一个整数数组。我想把这个方法变成可观察的,一个一个地输出数字。这部分工作正常。

问题始于 'take' 运算符 - 如果没有人在收听结果,我想停止数据库请求。我的问题是 'scroll' 函数在达到停止条件之前不会停止执行,即使 'largeQueryPromise' 由于 take(10) 运算符而不再收听它。

当订阅者由于各种原因取消订阅时,是否有可能停止 observable 的执行?

let ind = 0;
function dbRequest(): Promise<number[]> {
    return new Promise(resolve => resolve([ind++, ind++]));
}

async function largeQuery(index: number) {
    let res = await dbRequest();
    return new Observable(observer => scroll(observer, res, index));
}

function scroll(observer: Subscriber<number>, res: number[], index: number) {
    if (Math.round(Math.random() * 5) === 0) {
        console.log(`completed sequence ${index}`);
        observer.complete();
        return;
    }

    res.forEach(value => observer.next(value));
    dbRequest().then(arr => scroll(observer, arr, index));
}

async function largeQueryPromise(index: number) {
    console.log(`started sequence ${index}`);
    const obs = await largeQuery(index);
    obs.pipe(take(10)).subscribe(
        undefined, 
        console.error, 
        () => {
            console.log(`stopped to listen for sequence ${index}`);
            largeQueryPromise(++index).then();
        });
}

largeQueryPromise(0).then();

Observer 有一个参数“closed”,表示该订阅者是否已取消订阅。知道这一点,解决方案就很简单了:

function scroll(observer: Subscriber<number>, res: number[], index: number) {
    if (Math.round(Math.random() * 5) === 0) {
        console.log(`completed sequence ${index}`);
        observer.complete();
        return;
    }

    for(let i=0; i<res.length && !observer.closed; i++)
        observer.next(res[i]);

    if(!observer.closed)
        dbRequest().then(arr => scroll(observer, arr, index));
}

编辑:请注意,从技术上讲,您不需要在 for 循环中进行检查 - 所有 .next 都将是一个 noop。

您的 largeQuery 只能使用运算符来完成。当上一个请求发出时,使用 expand 递归调用 dbRequest()。通过返回 EMPTY 结束这个递归。使用 concatAll 传播传入数组发射。

function largeQuery(index: number): Observable<number> {
  console.log("largeQuery2 for", index);
  return from(dbRequest()).pipe(
    expand(res => {
      if (Math.round(Math.random() * 5) === 0) {
        console.log(`completed sequence ${index}`);
        return EMPTY;
      }
      // The observable returned here gets subscribed to before the 'take' operator
      // below ends the subscription. To prevent an additional call of 'dbRequest'
      // at the end, the observable returned here has to be asynchronous. 
      // That's why 'timer' is used. 
      // If this doesn't turn out to be an issue for you, the line below could be 
      // replace with 'return defer(() => dbRequest())' or even 'return from(dbRequest())'
      return timer(0).pipe(switchMap(() => dbRequest()));
    }),
    concatAll()
  );
}

function recursiveLargeQuery(index: number) {
  console.log(`started sequence ${index}`);
  largeQuery(index).pipe(
    take(10),
  ).subscribe(
    v => console.log(v), 
    console.error, 
    () => {
      console.log(`stopped to listen for sequence ${index}`);
      if (index < 2) { // end the recursion at some point
        recursiveLargeQuery(++index);
      }
    });
}

recursiveLargeQuery(0)

https://stackblitz.com/edit/rxjs-ihxkax?file=index.ts