以编程方式离开 rxjs-for-await 循环

Leave rxjs-for-await loop programatically

我有一个应用程序可以在服务器抽取 SSE 的地方打开流。 我希望循环 运行 直到期望值到达,然后离开循环并继续。 我查看了 takeWhile 运算符,但找不到实现它的方法。我也不知道我怎么能 unsubscribe 并且因为流永远不会完成...

const stream = this.sseService.returnAsObservable();
      for await (const data of eachValueFrom(stream)) {
        console.log(data);
        if (data.jobid === "JOB05879") {
          this.sseService.stopConnection();
          // how to get out now?
        }
      }
      console.log('we are out');

通常,混合 promises 和 observables 是一种反模式。如果您不介意,那么这里有一种方法可以只获取第一个数据,其中 data.jobid === "JOB05879"。您不需要 for-await,因为您只是期望从该流中获得 1 个值。

const stream = this.sseService.returnAsObservable().pipe(
  filter(data => data.jobid === "JOB05879"),
  first()
);

data = await stream.toPromise();
console.log(data);
this.sseService.stopConnection();
console.log('we are done');

没有承诺:

const stream = this.sseService.returnAsObservable().pipe(
  filter(data => data.jobid === "JOB05879"),
  first()
).subscribe({
  next: data => {
    console.log(data);
    this.sseService.stopConnection();
  },
  error: _ console.log("Data with jobid JOB05879 not found"),
  complete: () => console.log("We are done");
});

更新 #1:使用 forkJoin

您编写了以下内容,它首先获取一个 finito 数组,然后将该数组用作您的 notes.

的输入
const finito = await forkJoin([     
  this.jobsService.submitTestJob('blabla1').pipe(first()),     
  this.jobsService.submitTestJob('blabla2').pipe(first())
]).toPromise(); 

const notes = await forkJoin([     
  this.sseService.returnAsObservable().pipe(
    filter((d: any) => d.jobid === finito[0].jobid), 
    first()
  ),     
  this.sseService.returnAsObservable().pipe(
    filter((d: any) => d.jobid === finito[1].jobid), 
    first()
  ) 
]).toPromise();

这应该可行,尽管您应该进行大量错误检查。

这实际上可以使用 RxJS 运算符进行简化,这样您就不必等待每个 finito

const notes = await merge(
  this.jobsService.submitTestJob('blabla1').pipe(first()),
  this.jobsService.submitTestJob('blabla2').pipe(first()),
  this.jobsService.submitTestJob('blabla3').pipe(first()),
  this.jobsService.submitTestJob('blabla4').pipe(first()),
  this.jobsService.submitTestJob('blabla5').pipe(first()),
).pipe(
  mergeMap(finito => this.sseService.returnAsObservable().pipe(
    filter((d: any) => d.jobid === finito.jobid), 
    first()
  )),
  toArray()
).toPromise();

如果您愿意,还有一个使其更加简洁的步骤:

const notes = await from([
  "blabla1", 
  "blabla2", 
  "blabla3", 
  "blabla4", 
  "blabla5"
]).pipe(
  mergeMap(blabla => 
    this.jobsService.submitTestJob(blabla).pipe(first())
  ),
  mergeMap(finito => this.sseService.returnAsObservable().pipe(
    filter((d: any) => d.jobid === finito.jobid), 
    first()
  )),
  toArray()
).toPromise();

更简洁:

const notes = await from([1,2,3,4,5]).pipe(
  map(num => `blabla${num}`),
  mergeMap(blabla => 
    this.jobsService.submitTestJob(blabla).pipe(first())
  ),
  mergeMap(finito => this.sseService.returnAsObservable().pipe(
    filter((d: any) => d.jobid === finito.jobid), 
    first()
  )),
  toArray()
).toPromise();

仅调用 1 次 sseService

从您的示例代码来看,您似乎应该可以只调用一次 this.sseService.returnAsObservable()。它可以过滤任何允许的作业。

可能看起来像这样:

const params = [1,2,3,4,5];
const notes = await from(params).pipe(
  map(num => `blabla${num}`),
  mergeMap(blabla => 
    this.jobsService.submitTestJob(blabla).pipe(first())
  ),
  toArray(),
  mergeMap(finitoArray => this.sseService.returnAsObservable().pipe(
    filter(d => finitoArray.map(f => f.jobid).includes(d.jobid))
  )),
  take(params.length),
  toArray()
).toPromise();

并把这个例子带回你写的两阶段代码,看起来像这样:

const params = [1,2,3,4,5];
const streams = params
  .map(num => `blabla${num}`)
  .map(blabla => this.jobsService.submitTestJob(blabla).pipe(first())

const finito = await forkJoin(streams).toPromise(); 

const notes = await this.sseService.returnAsObservable().pipe(
  filter(d => finitoArray.map(f => f.jobid).includes(d.jobid)),
  take(params.length),
  toArray()
).toPromise();