RXJS 等待所有 Observables 完成和 Return 结果
RXJS Wait for All Observables to Complete and Return Results
我正在尝试创建一个 RX 流,它将异步执行 XHR 调用列表,然后等待它们完成,然后再进行下一次调用。
为了帮助解释,在普通 JS 中可以这样写:
try {
await* [
...requests.map(r => angularHttpService.get(`/foo/bar/${r}`))
];
} catch(e) { throw e }
// do something
这是我正在尝试的代码,但它 运行 它们是单独的,而不是等待它们全部完成后再继续。 (这是一个 NGRX 效果流,因此它与 vanilla rx 略有不同)。
mergeMap(
() => this.requests, concatMap((resqests) => from(resqests))),
(request) =>
this.myAngularHttpService
.get(`foo/bar/${request}`)
.pipe(catchError(e => of(new HttpError(e))))
),
switchMap(res => new DeleteSuccess())
您可以使用 forkJoin,它会从每个已完成的可观察对象中发出最后发出的值。以下是链接文档中的示例:
import { mergeMap } from 'rxjs/operators';
import { forkJoin } from 'rxjs/observable/forkJoin';
import { of } from 'rxjs/observable/of';
const myPromise = val =>
new Promise(resolve =>
setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
);
const source = of([1, 2, 3, 4, 5]);
//emit array of all 5 results
const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
/*
output:
[
"Promise Resolved: 1",
"Promise Resolved: 2",
"Promise Resolved: 3",
"Promise Resolved: 4",
"Promise Resolved: 5"
]
*/
const subscribe = example.subscribe(val => console.log(val));
Peter B Smith 也有这个不错的食谱,也使用 forkJoin
来提出相同的建议,我将 copy/past 下面的内容:
复制自:https://gist.github.com/peterbsmyth/ce94c0a5ddceb99bab24a761731d1f07
使用@ngrx/Effects
进行链式 API 调用
目的
此秘诀可用于烹调作为单个操作结果的链式 API 调用。
描述
在下面的示例中,调度了一个名为 POST_REPO
的操作,其目的是在 GitHub 上创建一个新的存储库,然后在创建后用新数据更新自述文件。
为此,GitHub API:
需要 4 API 次调用
- POST 一个新的 repostiry
- 获取新仓库的master分支
- 获取主分支上的文件
- PUT README.md 文件
POST_REPO's payload contains
payload.repo 包含 API 调用 1 所需的信息。
API 调用 1 的响应对于 API 调用 2 是必需的。
API 调用 2 的响应对于 API 调用 3 是必需的。
来自 API 调用 3 和 `payload.file 的响应,其中包含更新 README.md 文件所需的信息,对于 API 调用 4 是必需的。
使用 Observable.ForkJoin
使这成为可能。
例子
import { Injectable } from '@angular/core';
import { Effect, Actions } from '@ngrx/effects';
import { Action } from '@ngrx/store';
import { Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { handleError } from './handleError';
import { GithubService } from '../services/github.service';
import * as githubActions from '../actions/github';
@Injectable()
export class GitHubEffects {
@Effect()
postRepo$: Observable<Action> = this.actions$
.ofType(githubActions.POST_REPO)
.map((action: githubActions.PostRepo) => action.payload)
// return the payload and POST the repo
.switchMap((payload: any) => Observable.forkJoin([
Observable.of(payload),
this.githubService.postRepo(payload.repo)
]))
// return the repo and the master branch as an array
.switchMap((data: any) => {
const [payload, repo] = data;
return Observable.forkJoin([
Observable.of(payload),
Observable.of(repo),
this.githubService.getMasterBranch(repo.name)
]);
})
// return the payload, the repo, and get the sha for README
.switchMap((data: any) => {
const [payload, repo, branch] = data;
return Observable.forkJoin([
Observable.of(payload),
Observable.of(repo),
this.githubService.getFiles(repo.name, branch)
.map((files: any) => files.tree
.filter(file => file.path === 'README.md')
.map(file => file.sha)[0]
)
]);
})
// update README with data from payload.file
.switchMap((data: any) => {
const [payload, repo, sha] = data;
payload.file.sha = sha;
return this.githubService.putFile(repo.name, payload.file);
});
constructor(
private actions$: Actions,
private githubService: GithubService,
) {}
}
我正在尝试创建一个 RX 流,它将异步执行 XHR 调用列表,然后等待它们完成,然后再进行下一次调用。
为了帮助解释,在普通 JS 中可以这样写:
try {
await* [
...requests.map(r => angularHttpService.get(`/foo/bar/${r}`))
];
} catch(e) { throw e }
// do something
这是我正在尝试的代码,但它 运行 它们是单独的,而不是等待它们全部完成后再继续。 (这是一个 NGRX 效果流,因此它与 vanilla rx 略有不同)。
mergeMap(
() => this.requests, concatMap((resqests) => from(resqests))),
(request) =>
this.myAngularHttpService
.get(`foo/bar/${request}`)
.pipe(catchError(e => of(new HttpError(e))))
),
switchMap(res => new DeleteSuccess())
您可以使用 forkJoin,它会从每个已完成的可观察对象中发出最后发出的值。以下是链接文档中的示例:
import { mergeMap } from 'rxjs/operators';
import { forkJoin } from 'rxjs/observable/forkJoin';
import { of } from 'rxjs/observable/of';
const myPromise = val =>
new Promise(resolve =>
setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
);
const source = of([1, 2, 3, 4, 5]);
//emit array of all 5 results
const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
/*
output:
[
"Promise Resolved: 1",
"Promise Resolved: 2",
"Promise Resolved: 3",
"Promise Resolved: 4",
"Promise Resolved: 5"
]
*/
const subscribe = example.subscribe(val => console.log(val));
Peter B Smith 也有这个不错的食谱,也使用 forkJoin
来提出相同的建议,我将 copy/past 下面的内容:
复制自:https://gist.github.com/peterbsmyth/ce94c0a5ddceb99bab24a761731d1f07
使用@ngrx/Effects
进行链式 API 调用目的
此秘诀可用于烹调作为单个操作结果的链式 API 调用。
描述
在下面的示例中,调度了一个名为 POST_REPO
的操作,其目的是在 GitHub 上创建一个新的存储库,然后在创建后用新数据更新自述文件。
为此,GitHub API:
- POST 一个新的 repostiry
- 获取新仓库的master分支
- 获取主分支上的文件
- PUT README.md 文件
POST_REPO's payload contains
payload.repo 包含 API 调用 1 所需的信息。
API 调用 1 的响应对于 API 调用 2 是必需的。
API 调用 2 的响应对于 API 调用 3 是必需的。
来自 API 调用 3 和 `payload.file 的响应,其中包含更新 README.md 文件所需的信息,对于 API 调用 4 是必需的。
使用 Observable.ForkJoin
使这成为可能。
例子
import { Injectable } from '@angular/core';
import { Effect, Actions } from '@ngrx/effects';
import { Action } from '@ngrx/store';
import { Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { handleError } from './handleError';
import { GithubService } from '../services/github.service';
import * as githubActions from '../actions/github';
@Injectable()
export class GitHubEffects {
@Effect()
postRepo$: Observable<Action> = this.actions$
.ofType(githubActions.POST_REPO)
.map((action: githubActions.PostRepo) => action.payload)
// return the payload and POST the repo
.switchMap((payload: any) => Observable.forkJoin([
Observable.of(payload),
this.githubService.postRepo(payload.repo)
]))
// return the repo and the master branch as an array
.switchMap((data: any) => {
const [payload, repo] = data;
return Observable.forkJoin([
Observable.of(payload),
Observable.of(repo),
this.githubService.getMasterBranch(repo.name)
]);
})
// return the payload, the repo, and get the sha for README
.switchMap((data: any) => {
const [payload, repo, branch] = data;
return Observable.forkJoin([
Observable.of(payload),
Observable.of(repo),
this.githubService.getFiles(repo.name, branch)
.map((files: any) => files.tree
.filter(file => file.path === 'README.md')
.map(file => file.sha)[0]
)
]);
})
// update README with data from payload.file
.switchMap((data: any) => {
const [payload, repo, sha] = data;
payload.file.sha = sha;
return this.githubService.putFile(repo.name, payload.file);
});
constructor(
private actions$: Actions,
private githubService: GithubService,
) {}
}