以编程方式离开 rxjs-for-await 循环
Leave rxjs-for-await loop programatically
我有一个应用程序可以在服务器抽取 SSE 的地方打开流。
我希望循环 运行 直到期望值到达,然后离开循环并继续。
我查看了 takeWhile
运算符,但找不到实现它的方法。我也不知道我怎么能 unsubscribe
并且因为流永远不会完成...
const stream = this.sseService.returnAsObservable();
for await (const data of eachValueFrom(stream)) {
console.log(data);
if (data.jobid === "JOB05879") {
this.sseService.stopConnection();
// how to get out now?
}
}
console.log('we are out');
通常,混合 promises 和 observables 是一种反模式。如果您不介意,那么这里有一种方法可以只获取第一个数据,其中 data.jobid === "JOB05879"。您不需要 for-await,因为您只是期望从该流中获得 1 个值。
const stream = this.sseService.returnAsObservable().pipe(
filter(data => data.jobid === "JOB05879"),
first()
);
data = await stream.toPromise();
console.log(data);
this.sseService.stopConnection();
console.log('we are done');
没有承诺:
const stream = this.sseService.returnAsObservable().pipe(
filter(data => data.jobid === "JOB05879"),
first()
).subscribe({
next: data => {
console.log(data);
this.sseService.stopConnection();
},
error: _ console.log("Data with jobid JOB05879 not found"),
complete: () => console.log("We are done");
});
更新 #1:使用 forkJoin
您编写了以下内容,它首先获取一个 finito
数组,然后将该数组用作您的 notes
.
的输入
const finito = await forkJoin([
this.jobsService.submitTestJob('blabla1').pipe(first()),
this.jobsService.submitTestJob('blabla2').pipe(first())
]).toPromise();
const notes = await forkJoin([
this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito[0].jobid),
first()
),
this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito[1].jobid),
first()
)
]).toPromise();
这应该可行,尽管您应该进行大量错误检查。
这实际上可以使用 RxJS 运算符进行简化,这样您就不必等待每个 finito
const notes = await merge(
this.jobsService.submitTestJob('blabla1').pipe(first()),
this.jobsService.submitTestJob('blabla2').pipe(first()),
this.jobsService.submitTestJob('blabla3').pipe(first()),
this.jobsService.submitTestJob('blabla4').pipe(first()),
this.jobsService.submitTestJob('blabla5').pipe(first()),
).pipe(
mergeMap(finito => this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito.jobid),
first()
)),
toArray()
).toPromise();
如果您愿意,还有一个使其更加简洁的步骤:
const notes = await from([
"blabla1",
"blabla2",
"blabla3",
"blabla4",
"blabla5"
]).pipe(
mergeMap(blabla =>
this.jobsService.submitTestJob(blabla).pipe(first())
),
mergeMap(finito => this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito.jobid),
first()
)),
toArray()
).toPromise();
更简洁:
const notes = await from([1,2,3,4,5]).pipe(
map(num => `blabla${num}`),
mergeMap(blabla =>
this.jobsService.submitTestJob(blabla).pipe(first())
),
mergeMap(finito => this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito.jobid),
first()
)),
toArray()
).toPromise();
仅调用 1 次 sseService
从您的示例代码来看,您似乎应该可以只调用一次 this.sseService.returnAsObservable()。它可以过滤任何允许的作业。
可能看起来像这样:
const params = [1,2,3,4,5];
const notes = await from(params).pipe(
map(num => `blabla${num}`),
mergeMap(blabla =>
this.jobsService.submitTestJob(blabla).pipe(first())
),
toArray(),
mergeMap(finitoArray => this.sseService.returnAsObservable().pipe(
filter(d => finitoArray.map(f => f.jobid).includes(d.jobid))
)),
take(params.length),
toArray()
).toPromise();
并把这个例子带回你写的两阶段代码,看起来像这样:
const params = [1,2,3,4,5];
const streams = params
.map(num => `blabla${num}`)
.map(blabla => this.jobsService.submitTestJob(blabla).pipe(first())
const finito = await forkJoin(streams).toPromise();
const notes = await this.sseService.returnAsObservable().pipe(
filter(d => finitoArray.map(f => f.jobid).includes(d.jobid)),
take(params.length),
toArray()
).toPromise();
我有一个应用程序可以在服务器抽取 SSE 的地方打开流。
我希望循环 运行 直到期望值到达,然后离开循环并继续。
我查看了 takeWhile
运算符,但找不到实现它的方法。我也不知道我怎么能 unsubscribe
并且因为流永远不会完成...
const stream = this.sseService.returnAsObservable();
for await (const data of eachValueFrom(stream)) {
console.log(data);
if (data.jobid === "JOB05879") {
this.sseService.stopConnection();
// how to get out now?
}
}
console.log('we are out');
通常,混合 promises 和 observables 是一种反模式。如果您不介意,那么这里有一种方法可以只获取第一个数据,其中 data.jobid === "JOB05879"。您不需要 for-await,因为您只是期望从该流中获得 1 个值。
const stream = this.sseService.returnAsObservable().pipe(
filter(data => data.jobid === "JOB05879"),
first()
);
data = await stream.toPromise();
console.log(data);
this.sseService.stopConnection();
console.log('we are done');
没有承诺:
const stream = this.sseService.returnAsObservable().pipe(
filter(data => data.jobid === "JOB05879"),
first()
).subscribe({
next: data => {
console.log(data);
this.sseService.stopConnection();
},
error: _ console.log("Data with jobid JOB05879 not found"),
complete: () => console.log("We are done");
});
更新 #1:使用 forkJoin
您编写了以下内容,它首先获取一个 finito
数组,然后将该数组用作您的 notes
.
const finito = await forkJoin([
this.jobsService.submitTestJob('blabla1').pipe(first()),
this.jobsService.submitTestJob('blabla2').pipe(first())
]).toPromise();
const notes = await forkJoin([
this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito[0].jobid),
first()
),
this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito[1].jobid),
first()
)
]).toPromise();
这应该可行,尽管您应该进行大量错误检查。
这实际上可以使用 RxJS 运算符进行简化,这样您就不必等待每个 finito
const notes = await merge(
this.jobsService.submitTestJob('blabla1').pipe(first()),
this.jobsService.submitTestJob('blabla2').pipe(first()),
this.jobsService.submitTestJob('blabla3').pipe(first()),
this.jobsService.submitTestJob('blabla4').pipe(first()),
this.jobsService.submitTestJob('blabla5').pipe(first()),
).pipe(
mergeMap(finito => this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito.jobid),
first()
)),
toArray()
).toPromise();
如果您愿意,还有一个使其更加简洁的步骤:
const notes = await from([
"blabla1",
"blabla2",
"blabla3",
"blabla4",
"blabla5"
]).pipe(
mergeMap(blabla =>
this.jobsService.submitTestJob(blabla).pipe(first())
),
mergeMap(finito => this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito.jobid),
first()
)),
toArray()
).toPromise();
更简洁:
const notes = await from([1,2,3,4,5]).pipe(
map(num => `blabla${num}`),
mergeMap(blabla =>
this.jobsService.submitTestJob(blabla).pipe(first())
),
mergeMap(finito => this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito.jobid),
first()
)),
toArray()
).toPromise();
仅调用 1 次 sseService
从您的示例代码来看,您似乎应该可以只调用一次 this.sseService.returnAsObservable()。它可以过滤任何允许的作业。
可能看起来像这样:
const params = [1,2,3,4,5];
const notes = await from(params).pipe(
map(num => `blabla${num}`),
mergeMap(blabla =>
this.jobsService.submitTestJob(blabla).pipe(first())
),
toArray(),
mergeMap(finitoArray => this.sseService.returnAsObservable().pipe(
filter(d => finitoArray.map(f => f.jobid).includes(d.jobid))
)),
take(params.length),
toArray()
).toPromise();
并把这个例子带回你写的两阶段代码,看起来像这样:
const params = [1,2,3,4,5];
const streams = params
.map(num => `blabla${num}`)
.map(blabla => this.jobsService.submitTestJob(blabla).pipe(first())
const finito = await forkJoin(streams).toPromise();
const notes = await this.sseService.returnAsObservable().pipe(
filter(d => finitoArray.map(f => f.jobid).includes(d.jobid)),
take(params.length),
toArray()
).toPromise();