Angular 处理多个依赖订阅
Angular handle multiple dependent subscriptions
有什么帮助吗?
let notificationsMessages = []
countries.forEach((country: any) => {
this.isActiveCountry(country.isActive).subscribe((data) => { // // CALL #1 TO API
country.serverId = data.serverId;
this.uploadPhotoToApi(country.fileSource).subscribe((response) => { // CALL #2 TO API
// server return the uploaded file ID
country.serverFileID = response.serverFileId;
this.sendCountryToApi(country).subscribe((response) => { // CALL #3 TO API
this.countriesTable.delete(country.id).then(() => {
// Delete the uploaded country from local database
// if is the last country EMIT EVENT
}, (error) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // Error to delete country from indexedDB
});
}, (error) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // // Error to upload country to API
});
}, (errorCode) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // Error on sending file to API
});
}, (error) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // // Error on country identification
});
});
如何在处理完所有 country
列表后发出事件?
我需要知道有多少国家上传成功,有多少国家上传失败。
例如,如果我有一个包含 50 个国家/地区的列表,当处理完最后一个国家/地区时,我想发出一个包含 2 个数组的事件...如下所示:
成功:[countryId1, countryId2...]
错误:['Country Id 2 failed on upload'、'Country Id 10 failed on file upload']
所有这 3 个调用都是相关的,必须按顺序执行...我无法更改此流程。
我是否应该在 CALL #3 成功以及所有错误函数上发出事件?
谢谢!
尽量避免将许多 .subscribe(
嵌套在彼此内部的诱惑。正如@praveen-soni 提到的,switchMap
可以帮助解决这个问题。
然后要在处理完所有国家/地区后获取状态,我认为 forkJoin
非常适合:它接收一个可观察对象列表,并在所有这些都完成后发出。
如何建立观察列表?您最初有一个国家列表,因此您可以将每个国家映射到处理该国家的可观察对象。我们还可以使用一个 catchError
,这样错误就不会关闭整个流,而只会关闭那个特定国家/地区的流。
我认为它看起来像:
const result$ = forkJoin(
countries.map((country) =>
this.isActiveCountry(country.isActive).pipe(
switchMap((data) => {
country.serverId = data.serverId;
return this.uploadPhotoToApi(country.fileSource);
}),
switchMap((response) => {
country.serverFileId = response.serverFileId;
return this.sendCountryToApi(country);
}),
switchMap(() => {
return this.countriesTable.delete(country.id);
}),
map(() => {
// Everything went OK, map it to an OK status
return {
type: "success",
};
}),
catchError((error) => {
// If any step fails, map it to an error type
return of({
type: "error",
error,
});
}),
take(1) // Make sure the observable completes
)
)
);
// result$ can now be used as any other observable
result$.subscribe(result => {
console.log(result);
})
这是一种方法。这可能有点矫枉过正,因为它为您提供了对错误处理的许多精细控制,但基本上总是以相同的方式处理错误。
即便如此,这可能比最直接的解决方案更容易扩展。
这里:
interface TaggedCountry{
success: boolean,
country: any,
error?: any
}
class ArbiratryClassName {
processCountry(country: any): Observable<TaggedCountry>{
return this.isActiveCountry(country.isActive).pipe(
// country now has serverId set
map(({serverId}) => ({...country, serverId})),
catchError(error => throwError(() => ({
success: false,
country,
error
}) as TaggedCountry)),
mergeMap((resultCountry: any) => this.uploadPhotoToApi(resultCountry.fileSource).pipe(
// country now has serverFileId set
map(({serverFileId}) => ({...resultCountry, serverFileId})),
catchError(error => throwError(() => ({
success: false,
country: resultCountry,
error
}) as TaggedCountry))
)),
mergeMap((resultCountry: any) => this.sendCountryToApi(resultCountry).pipe(
// Ignore response from sendCountryToApi
mapTo(resultCountry),
catchError(error => throwError(() => ({
success: false,
country: resultCountry,
error
}) as TaggedCountry))
)),
mergeMap((resultCountry: any) => from(this.countriesTable.delete(resultCountry.id)).pipe(
// Ignore response from countriesTable.delete
mapTo(resultCountry),
catchError(error => throwError(() => ({
success: false,
country: resultCountry,
error
}) as TaggedCountry))
)),
map((resultCountry: any) => ({
success: true,
country: resultCountry
}) as TaggedCountry),
// Convert errors into regular emissions
catchError((tagged:TaggedCountry) => of(tagged))
);
}
processCountries(countries: any[]): Observable<{success: TaggedCountry[], errors: TaggedCountry[]}>{
return forkJoin(countries.map(c => this.processCountry(c))).pipe(
map((tagged: TaggedCountry[]) => ({
success: tagged.filter(tag => tag.success),
errors: tagged.filter(tag => !tag.success)
}))
)
}
doSomethingWith(countries: any[]): void {
this.processCountries(countries).subscribe({
next: countries => console.log("All countries processed. Result: ", countries),
complete: () => console.log("There's only one emission, so this should get called immediately after .next() was called"),
error: err => console.log("This is a surprise, is there an error we didn't catch earlier? Error: ", err)
})
}
}
如果看到相同的事情以不同的方式完成会有所帮助,这里是 processCountry
的较短实现
processCountry(country: any): Observable<TaggedCountry>{
return this.isActiveCountry(country.isActive).pipe(
tap((res:any) => country.serverId = res.serverId),
switchMap(_ => this.uploadPhotoToApi(country.fileSource)),
tap((res:any) => country.serverFileId = res.serverFileId),
switchMap(_ => this.sendCountryToApi(country)),
switchMap(_ => this.countriesTable.delete(country.id)),
// Tag our result as a success
map(_ => ({
success: true,
country
}) as TaggedCountry),
// Tag our errors and convert errors into regular emissions
catchError(error => of(({
success: false,
country,
error
}) as TaggedCountry))
);
}
有什么帮助吗?
let notificationsMessages = []
countries.forEach((country: any) => {
this.isActiveCountry(country.isActive).subscribe((data) => { // // CALL #1 TO API
country.serverId = data.serverId;
this.uploadPhotoToApi(country.fileSource).subscribe((response) => { // CALL #2 TO API
// server return the uploaded file ID
country.serverFileID = response.serverFileId;
this.sendCountryToApi(country).subscribe((response) => { // CALL #3 TO API
this.countriesTable.delete(country.id).then(() => {
// Delete the uploaded country from local database
// if is the last country EMIT EVENT
}, (error) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // Error to delete country from indexedDB
});
}, (error) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // // Error to upload country to API
});
}, (errorCode) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // Error on sending file to API
});
}, (error) => {
// if is the last country EMIT EVENT
notificationsMessages.push(error); // // Error on country identification
});
});
如何在处理完所有 country
列表后发出事件?
我需要知道有多少国家上传成功,有多少国家上传失败。
例如,如果我有一个包含 50 个国家/地区的列表,当处理完最后一个国家/地区时,我想发出一个包含 2 个数组的事件...如下所示: 成功:[countryId1, countryId2...] 错误:['Country Id 2 failed on upload'、'Country Id 10 failed on file upload']
所有这 3 个调用都是相关的,必须按顺序执行...我无法更改此流程。 我是否应该在 CALL #3 成功以及所有错误函数上发出事件? 谢谢!
尽量避免将许多 .subscribe(
嵌套在彼此内部的诱惑。正如@praveen-soni 提到的,switchMap
可以帮助解决这个问题。
然后要在处理完所有国家/地区后获取状态,我认为 forkJoin
非常适合:它接收一个可观察对象列表,并在所有这些都完成后发出。
如何建立观察列表?您最初有一个国家列表,因此您可以将每个国家映射到处理该国家的可观察对象。我们还可以使用一个 catchError
,这样错误就不会关闭整个流,而只会关闭那个特定国家/地区的流。
我认为它看起来像:
const result$ = forkJoin(
countries.map((country) =>
this.isActiveCountry(country.isActive).pipe(
switchMap((data) => {
country.serverId = data.serverId;
return this.uploadPhotoToApi(country.fileSource);
}),
switchMap((response) => {
country.serverFileId = response.serverFileId;
return this.sendCountryToApi(country);
}),
switchMap(() => {
return this.countriesTable.delete(country.id);
}),
map(() => {
// Everything went OK, map it to an OK status
return {
type: "success",
};
}),
catchError((error) => {
// If any step fails, map it to an error type
return of({
type: "error",
error,
});
}),
take(1) // Make sure the observable completes
)
)
);
// result$ can now be used as any other observable
result$.subscribe(result => {
console.log(result);
})
这是一种方法。这可能有点矫枉过正,因为它为您提供了对错误处理的许多精细控制,但基本上总是以相同的方式处理错误。
即便如此,这可能比最直接的解决方案更容易扩展。
这里:
interface TaggedCountry{
success: boolean,
country: any,
error?: any
}
class ArbiratryClassName {
processCountry(country: any): Observable<TaggedCountry>{
return this.isActiveCountry(country.isActive).pipe(
// country now has serverId set
map(({serverId}) => ({...country, serverId})),
catchError(error => throwError(() => ({
success: false,
country,
error
}) as TaggedCountry)),
mergeMap((resultCountry: any) => this.uploadPhotoToApi(resultCountry.fileSource).pipe(
// country now has serverFileId set
map(({serverFileId}) => ({...resultCountry, serverFileId})),
catchError(error => throwError(() => ({
success: false,
country: resultCountry,
error
}) as TaggedCountry))
)),
mergeMap((resultCountry: any) => this.sendCountryToApi(resultCountry).pipe(
// Ignore response from sendCountryToApi
mapTo(resultCountry),
catchError(error => throwError(() => ({
success: false,
country: resultCountry,
error
}) as TaggedCountry))
)),
mergeMap((resultCountry: any) => from(this.countriesTable.delete(resultCountry.id)).pipe(
// Ignore response from countriesTable.delete
mapTo(resultCountry),
catchError(error => throwError(() => ({
success: false,
country: resultCountry,
error
}) as TaggedCountry))
)),
map((resultCountry: any) => ({
success: true,
country: resultCountry
}) as TaggedCountry),
// Convert errors into regular emissions
catchError((tagged:TaggedCountry) => of(tagged))
);
}
processCountries(countries: any[]): Observable<{success: TaggedCountry[], errors: TaggedCountry[]}>{
return forkJoin(countries.map(c => this.processCountry(c))).pipe(
map((tagged: TaggedCountry[]) => ({
success: tagged.filter(tag => tag.success),
errors: tagged.filter(tag => !tag.success)
}))
)
}
doSomethingWith(countries: any[]): void {
this.processCountries(countries).subscribe({
next: countries => console.log("All countries processed. Result: ", countries),
complete: () => console.log("There's only one emission, so this should get called immediately after .next() was called"),
error: err => console.log("This is a surprise, is there an error we didn't catch earlier? Error: ", err)
})
}
}
如果看到相同的事情以不同的方式完成会有所帮助,这里是 processCountry
processCountry(country: any): Observable<TaggedCountry>{
return this.isActiveCountry(country.isActive).pipe(
tap((res:any) => country.serverId = res.serverId),
switchMap(_ => this.uploadPhotoToApi(country.fileSource)),
tap((res:any) => country.serverFileId = res.serverFileId),
switchMap(_ => this.sendCountryToApi(country)),
switchMap(_ => this.countriesTable.delete(country.id)),
// Tag our result as a success
map(_ => ({
success: true,
country
}) as TaggedCountry),
// Tag our errors and convert errors into regular emissions
catchError(error => of(({
success: false,
country,
error
}) as TaggedCountry))
);
}