如何将 combine/merge observables 转换为单个 observable 并将结果与输入相关联?
How to combine/merge observables into a single observable and associate result with the input?
我有一个 saveForm() 函数,它应该按顺序执行以下操作:
- 获取表单数据并将其作为文档添加到 FireStore 集合。
- 成功后,遍历 (attachmentsFormArray) 用户选择的所有文件并将每个文件上传到 FireStorage。
- 当所有文件都上传完成后,将每个文件的documentUrl分配给我们在步骤#1中保存的FireStore文档上的相应文件映射。然后进行 api 调用以实际保存更新的 firestore 文档。
下面是我的 saveForm() 函数:
saveForm() {
let fixedDepositEntity = this.getEntityFromForm();
this.fixedDepositsFirestoreCollection.add(fixedDepositEntity).then(documentRef => {
if (this.attachmentsFormArray.controls.length !== 0) {
this.attachmentsFormArray.controls.forEach(group => {
let fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
let uploadTask = fileRef.put(group.get('file').value);
// observe percentage changes
uploadTask.percentageChanges().subscribe(percent => {
group.get('percentComplete').setValue(Math.round(percent));
});
// get notified when the download URL is available
uploadTask.snapshotChanges().pipe(
finalize(() => {
fileRef.getDownloadURL().subscribe(url => {
group.get('downloadUrl').setValue(url);
});
}))
.subscribe();
});
}
});
}
目前,上面的代码简单地循环遍历 attachmentsFormArray,一旦文件上传,最后它将 downloadUrl 分配给 attachmentsFormArray。
当用户选择多个文件时,我有以下 handleFileInput() 事件处理程序:
handleFileInput(files: FileList) {
if (!files || files.length === 0) {
return;
}
Array.from(files).forEach(file => {
this.attachmentsFormArray.push(this.formBuilder.group({
fileName: [file.name],
fileSize: [file.size],
label: [''],
file: [file],
downloadUrl: [''],
percentComplete: [''],
uploadTaskState: ['']
}));
});
AngularFire 库提供了一个 snapshotChanges() 方法,该方法 returns Observable。我想要 combine/merge 所有这些 Observable(以便知道所有文件都已完全上传),然后订阅生成的 Observable。但我不确定如何将单个可观察结果与用户选择的相应文件对象相关联(如#3 中所述)。
我知道我们可以使用 RxJs 运算符实现此行为,但不确定在我的场景中使用哪一个。提前感谢任何帮助。
编辑 1:根据“Mrk Sef”的回答实施。大多数时候它工作正常。但是,有时未设置 downloadUrl。我无法理解此间歇性问题的原因。
saveForm() {
try {
this.fixedDepositsFormGroup.disable();
let fixedDepositEntity = this.getEntityFromForm();
this.fixedDepositsFirestoreCollection
.add(fixedDepositEntity)
.then(documentRef => {
this.isBusy = true;
// Changes will be mapped to an array of Observable, once this mapping
// is complete, we can subscribe and wait for them to finish
console.log('Voila! Form Submitted.');
if (this.attachmentsFormArray.controls.length !== 0) {
const changes = this.attachmentsFormArray.controls.map(
group => {
const fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
const uploadTask = fileRef.put(group.get('file').value);
const percentageChanges$ = uploadTask.percentageChanges().pipe(
tap(percent => group.get('percentComplete').setValue(Math.round(percent)))
);
const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
finalize(() => fileRef.getDownloadURL().subscribe(url => group.get('downloadUrl').setValue(url)))
);
return [percentageChanges$, snapshotChanges$];
}
).reduce((acc, val) => acc.concat(val), []);; // Turn our array of tuples into an array
// forkJoin doesn't emit until all source Observables complete
forkJoin(changes).subscribe(_ => {
// By now all files have been uploaded to FireStorage
// Now we update the attachments property in our fixed-deposit document
const attachmentValues = (this.getControlValue('attachments') as any[])
.map(item => <Attachment>{
fileName: item.fileName,
fileSize: item.fileSize,
label: item.label,
downloadUrl: item.downloadUrl
});
documentRef.update({ attachments: attachmentValues });
console.log("Files Uploaded Successfully and Document Updated !");
});
}
})
.finally(() => {
this.fixedDepositsFormGroup.enable();
this.isBusy = false;
});
} finally {
}
}
当第三方生成 observable 时,您看到的一个常见设计是用一些您在 call-time 知道但可能不知道您何时订阅的自定义信息来标记它。
例如获取每个标题以'M'开头的文档的第三个词:
const documents: Document[] = getDocumentsService();
wordStreams: Observable<[Document, HttpResponse]>[] = documents
.filter(file => file.title.charAt(0) === 'M')
.map(file => getThirdWordService(file.id).pipe(
map(serviceResponse => ([file, serviceResponse]))
);
merge(...wordStreams).subscribe(([file, serviceResponse]) => {
console.log(`The third word of ${file.title} is ${serviceResponse.value}`)
});
最大的收获是,通过将值映射到元组或 object(相同的模式适用于 objects、映射等),您可以通过中的操作传递该信息一个流。
唯一的问题是,如果您不小心,最终可能会得到一个非纯功能性的流(可能会对您的程序状态造成副作用)。
我不太确定你的例子在做什么,但这是我对你想要什么的最佳猜测:
saveForm() {
let fixedDepositEntity = this.getEntityFromForm();
this.fixedDepositsFirestoreCollection
.add(fixedDepositEntity)
.then(documentRef => {
// Changes will be mapped to an array of Observable, once this mapping
// is complete, we can subscribe and wait for them to finish
const changes = this.attachmentsFormArray.controls.map(
group => {
const fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
const uploadTask = fileRef.put(group.get('file').value);
const percentageChanges$ = uploadTask.percentageChanges().pipe(
tap(percent => group.get('percentComplete').setValue(Math.round(percent)))
);
const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
mergeMap(_ => fileRef.getDownloadURL()),
tap(url => group.get('downloadUrl').setValue(url))
);
return [percentageChanges$, snapshotChanges$];
}
).flat(); // Turn our array of tuples into an array
// forkJoin doesn't emit until all source Observables complete
forkJoin(changes).subscribe(_ =>
console.log("All changes are complete")
);
});
}
相反,如果您想在订阅之前延迟写出值,这是另一个选项,可以使用一些稍后使用的附加数据更清楚地标记可观察流:
saveForm() {
let fixedDepositEntity = this.getEntityFromForm();
this.fixedDepositsFirestoreCollection
.add(fixedDepositEntity)
.then(documentRef => {
// Changes will be mapped to an array of Observable, once this mapping
// is complete, we can subscribe and wait for them to finish
const changes = this.attachmentsFormArray.controls.map(
group => {
const fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
const uploadTask = fileRef.put(group.get('file').value);
const percentageChanges$ = uploadTask.percentageChanges().pipe(
map(percent => ([group, percent]))
);
const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
mergeMap(_ => fileRef.getDownloadURL()),
map(url => ([group, url]))
);
return [percentageChanges$, snapshotChanges$];
}
);
const percentageChanges$ = changes.map(([a, b]) => a);
const snapshotChanges$ = changes.map(([a, b]) => b);
merge(...percentageChanges$).subscribe({
next: ([group, percent]) => group.get('percentComplete').setValue(Math.round(percent)),
complete: _ => console.log("All percentageChanges complete")
});
merge(...snapshotChanges$).subscribe({
next: ([group, url]) => group.get('downloadUrl').setValue(url),
complete: _ => console.log("All snapshotChanges complete")
});
});
}
不言而喻,none这个是测试过的。我希望您可以使用此处描述的内容来重组您的解决方案,以包含文件或您认为相关的任何其他信息。
更新
我的解决方案创建了一个名为
的流
const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
mergeMap(_ => fileRef.getDownloadURL()),
tap(url => group.get('downloadUrl').setValue(url))
);
这并不是您真正想要的,您想要一个仅在 uploadTask.snapshotChanges() 完成后才开始的流。 Finalize 很奇怪,因为它在失败和完成时都运行,我确定有一个可以配置的运算符来执行此操作,但我不知道如何实现。
我的解决方案创建了一个自定义运算符 (waitForEnd
),它在源完成或出错时发出布尔值并忽略源流中的所有其他元素
const waitForEnd = () =>
waitOn$ => new Observable(obsv => {
const final = (bool) => {
obsv.next(bool);
obsv.complete();
}
waitOn$.subscribe({
next: _ => {/*do nothing*/},
complete: () => final(true),
error: _ => final(false)
});
return {unsubscribe: () => waitOn$.unsubscribe()}
});
let snapshotChanges$ = uploadTask.snapshotChanges().pipe(
waitForEnd(),
mergeMap(_ => fileRef.getDownloadURL()),
tap(url => group.get('downloadUrl').setValue(url))
);
snapshotChanges$
将等待 uploadTask.snapshotChanges()
结束,然后才会获取下载 URL 并在完成之前设置值。
我有一个 saveForm() 函数,它应该按顺序执行以下操作:
- 获取表单数据并将其作为文档添加到 FireStore 集合。
- 成功后,遍历 (attachmentsFormArray) 用户选择的所有文件并将每个文件上传到 FireStorage。
- 当所有文件都上传完成后,将每个文件的documentUrl分配给我们在步骤#1中保存的FireStore文档上的相应文件映射。然后进行 api 调用以实际保存更新的 firestore 文档。
下面是我的 saveForm() 函数:
saveForm() {
let fixedDepositEntity = this.getEntityFromForm();
this.fixedDepositsFirestoreCollection.add(fixedDepositEntity).then(documentRef => {
if (this.attachmentsFormArray.controls.length !== 0) {
this.attachmentsFormArray.controls.forEach(group => {
let fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
let uploadTask = fileRef.put(group.get('file').value);
// observe percentage changes
uploadTask.percentageChanges().subscribe(percent => {
group.get('percentComplete').setValue(Math.round(percent));
});
// get notified when the download URL is available
uploadTask.snapshotChanges().pipe(
finalize(() => {
fileRef.getDownloadURL().subscribe(url => {
group.get('downloadUrl').setValue(url);
});
}))
.subscribe();
});
}
});
}
目前,上面的代码简单地循环遍历 attachmentsFormArray,一旦文件上传,最后它将 downloadUrl 分配给 attachmentsFormArray。
当用户选择多个文件时,我有以下 handleFileInput() 事件处理程序:
handleFileInput(files: FileList) {
if (!files || files.length === 0) {
return;
}
Array.from(files).forEach(file => {
this.attachmentsFormArray.push(this.formBuilder.group({
fileName: [file.name],
fileSize: [file.size],
label: [''],
file: [file],
downloadUrl: [''],
percentComplete: [''],
uploadTaskState: ['']
}));
});
AngularFire 库提供了一个 snapshotChanges() 方法,该方法 returns Observable
我知道我们可以使用 RxJs 运算符实现此行为,但不确定在我的场景中使用哪一个。提前感谢任何帮助。
编辑 1:根据“Mrk Sef”的回答实施。大多数时候它工作正常。但是,有时未设置 downloadUrl。我无法理解此间歇性问题的原因。
saveForm() {
try {
this.fixedDepositsFormGroup.disable();
let fixedDepositEntity = this.getEntityFromForm();
this.fixedDepositsFirestoreCollection
.add(fixedDepositEntity)
.then(documentRef => {
this.isBusy = true;
// Changes will be mapped to an array of Observable, once this mapping
// is complete, we can subscribe and wait for them to finish
console.log('Voila! Form Submitted.');
if (this.attachmentsFormArray.controls.length !== 0) {
const changes = this.attachmentsFormArray.controls.map(
group => {
const fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
const uploadTask = fileRef.put(group.get('file').value);
const percentageChanges$ = uploadTask.percentageChanges().pipe(
tap(percent => group.get('percentComplete').setValue(Math.round(percent)))
);
const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
finalize(() => fileRef.getDownloadURL().subscribe(url => group.get('downloadUrl').setValue(url)))
);
return [percentageChanges$, snapshotChanges$];
}
).reduce((acc, val) => acc.concat(val), []);; // Turn our array of tuples into an array
// forkJoin doesn't emit until all source Observables complete
forkJoin(changes).subscribe(_ => {
// By now all files have been uploaded to FireStorage
// Now we update the attachments property in our fixed-deposit document
const attachmentValues = (this.getControlValue('attachments') as any[])
.map(item => <Attachment>{
fileName: item.fileName,
fileSize: item.fileSize,
label: item.label,
downloadUrl: item.downloadUrl
});
documentRef.update({ attachments: attachmentValues });
console.log("Files Uploaded Successfully and Document Updated !");
});
}
})
.finally(() => {
this.fixedDepositsFormGroup.enable();
this.isBusy = false;
});
} finally {
}
}
当第三方生成 observable 时,您看到的一个常见设计是用一些您在 call-time 知道但可能不知道您何时订阅的自定义信息来标记它。
例如获取每个标题以'M'开头的文档的第三个词:
const documents: Document[] = getDocumentsService();
wordStreams: Observable<[Document, HttpResponse]>[] = documents
.filter(file => file.title.charAt(0) === 'M')
.map(file => getThirdWordService(file.id).pipe(
map(serviceResponse => ([file, serviceResponse]))
);
merge(...wordStreams).subscribe(([file, serviceResponse]) => {
console.log(`The third word of ${file.title} is ${serviceResponse.value}`)
});
最大的收获是,通过将值映射到元组或 object(相同的模式适用于 objects、映射等),您可以通过中的操作传递该信息一个流。
唯一的问题是,如果您不小心,最终可能会得到一个非纯功能性的流(可能会对您的程序状态造成副作用)。
我不太确定你的例子在做什么,但这是我对你想要什么的最佳猜测:
saveForm() {
let fixedDepositEntity = this.getEntityFromForm();
this.fixedDepositsFirestoreCollection
.add(fixedDepositEntity)
.then(documentRef => {
// Changes will be mapped to an array of Observable, once this mapping
// is complete, we can subscribe and wait for them to finish
const changes = this.attachmentsFormArray.controls.map(
group => {
const fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
const uploadTask = fileRef.put(group.get('file').value);
const percentageChanges$ = uploadTask.percentageChanges().pipe(
tap(percent => group.get('percentComplete').setValue(Math.round(percent)))
);
const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
mergeMap(_ => fileRef.getDownloadURL()),
tap(url => group.get('downloadUrl').setValue(url))
);
return [percentageChanges$, snapshotChanges$];
}
).flat(); // Turn our array of tuples into an array
// forkJoin doesn't emit until all source Observables complete
forkJoin(changes).subscribe(_ =>
console.log("All changes are complete")
);
});
}
相反,如果您想在订阅之前延迟写出值,这是另一个选项,可以使用一些稍后使用的附加数据更清楚地标记可观察流:
saveForm() {
let fixedDepositEntity = this.getEntityFromForm();
this.fixedDepositsFirestoreCollection
.add(fixedDepositEntity)
.then(documentRef => {
// Changes will be mapped to an array of Observable, once this mapping
// is complete, we can subscribe and wait for them to finish
const changes = this.attachmentsFormArray.controls.map(
group => {
const fileRef = this.fireStorage.ref(this.fixedDepositsStorageFolderPath + group.get('fileName').value);
const uploadTask = fileRef.put(group.get('file').value);
const percentageChanges$ = uploadTask.percentageChanges().pipe(
map(percent => ([group, percent]))
);
const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
mergeMap(_ => fileRef.getDownloadURL()),
map(url => ([group, url]))
);
return [percentageChanges$, snapshotChanges$];
}
);
const percentageChanges$ = changes.map(([a, b]) => a);
const snapshotChanges$ = changes.map(([a, b]) => b);
merge(...percentageChanges$).subscribe({
next: ([group, percent]) => group.get('percentComplete').setValue(Math.round(percent)),
complete: _ => console.log("All percentageChanges complete")
});
merge(...snapshotChanges$).subscribe({
next: ([group, url]) => group.get('downloadUrl').setValue(url),
complete: _ => console.log("All snapshotChanges complete")
});
});
}
不言而喻,none这个是测试过的。我希望您可以使用此处描述的内容来重组您的解决方案,以包含文件或您认为相关的任何其他信息。
更新
我的解决方案创建了一个名为
的流const snapshotChanges$ = uploadTask.snapshotChanges().pipe(
mergeMap(_ => fileRef.getDownloadURL()),
tap(url => group.get('downloadUrl').setValue(url))
);
这并不是您真正想要的,您想要一个仅在 uploadTask.snapshotChanges() 完成后才开始的流。 Finalize 很奇怪,因为它在失败和完成时都运行,我确定有一个可以配置的运算符来执行此操作,但我不知道如何实现。
我的解决方案创建了一个自定义运算符 (waitForEnd
),它在源完成或出错时发出布尔值并忽略源流中的所有其他元素
const waitForEnd = () =>
waitOn$ => new Observable(obsv => {
const final = (bool) => {
obsv.next(bool);
obsv.complete();
}
waitOn$.subscribe({
next: _ => {/*do nothing*/},
complete: () => final(true),
error: _ => final(false)
});
return {unsubscribe: () => waitOn$.unsubscribe()}
});
let snapshotChanges$ = uploadTask.snapshotChanges().pipe(
waitForEnd(),
mergeMap(_ => fileRef.getDownloadURL()),
tap(url => group.get('downloadUrl').setValue(url))
);
snapshotChanges$
将等待 uploadTask.snapshotChanges()
结束,然后才会获取下载 URL 并在完成之前设置值。