如何等待其他 epics 完成然后调用回调?
How to wait for other epics to complete and then invoke a callback?
我试图在第一个史诗中映射多个动作,在第二个史诗中处理它们,然后在第一个史诗中调用回调。
这是我看到的流程:
- fetchMultipleOperationsEpic 监听 OPERATIONS_FETCH_ALL 和
创建一个 fetchOperations 操作流。
- 然后 fetchOperationsEpic 启动,使用
每个调度操作的 fetchOperationData。
- 每次操作的数据都取完后,
fetchMultipleOperationsEpic 应该调用回调。
const fetchMultipleOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH_ALL),
switchMap(({ payload }) => {
const stream = of(...payload.ids.map(processId => fetchOperations({ processId })));
// PROBLEM HERE: 'done' is logged before actions from fetchOperationsEpic complete
return stream.pipe(finalize(() => console.log('done')))
})
)
const fetchOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH),
mergeMap(({ payload }) => fetchOperationData(payload.processId))
)
const fetchOperationData = processId => {
api.fetchOperationsByProcess(processId).pipe(
mergeMap(operations => of(fetchSuccess(), fooAction(), barAction())),
catchError(error => of(fetchOperationsFailure({ processId, error })))
)
}
也尝试过 forkJoin 但没有成功:
const fetchMultipleOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH_ALL),
switchMap(({ payload }) => {
const observables = [...payload.ids.map(processId => of(fetchOperations({ processId })))];
// Doesn't wait as well
return forkJoin(observables).pipe(
mergeMap(o => o),
finalize(() => console.log('done'))
)
})
)
事实上,我让它工作的唯一方法是避免调用 fetchOperations 操作(从而避免调用 fetchOperationsEpic),并直接调用 fetchOperationData:
const fetchMultipleOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH_ALL),
mergeMap(({ payload }) => {
const observables = [...payload.ids.map(id => fetchOperationData(id))];
return merge(...observables)
.pipe(finalize(() => console.log('done')))
})
)
很好。我只是想知道我最初的方法是否真的可行。
你认为你有一个 "problem here":
return stream.pipe(finalize(() => console.log('done')))
这实际上是正确的。 finalize
将 运行 紧接着 其正上方的 of
返回的操作列表 调度 到商店。那是因为您已经明确地将 finalize
链接到那个内部可观察对象。
您正在考虑的方法的问题是,一旦某个操作进入商店,它就不再在该特定可观察对象的范围内。您 不能 只是等待其他 epics 中特定操作的处理结束,因为最终,epics 永远不会真正结束。在严格的 RxJS 意义上,可观察对象永远不会发出 complete notification.
一般来说,我会建议研究使用单个史诗的解决方案(可能与您上一个示例的思路相同)。这使您可以简单地链接运算符,例如 finalize
,而无需跳过太多环节。您仍然可以重构您的代码,以便在您想要 share/duplicate 不同 epics.
之间的业务逻辑时使用可重用函数
您还可以研究依赖于 inter-epic 通过不同操作进行通信的解决方案。不过,这可能会变得棘手,具体取决于您的工作流程的复杂性。您需要每个操作来唯一标识 sub-processes。这是一个想法:
const fetchMultipleOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH_ALL),
switchMap(({ payload }) =>
merge(...payload.ids.map(processId =>
concat(
of(fetchOperations({ processId })),
// Note: This keeps the observable running until either
// the corresponding "success" or "failure" action is
// dispatched:
action$.pipe(
filter(action => (action.type === FETCH_SUCCESS || action.type === FETCH_FAILURE) && action.payload.processId === processId),
first(),
ignoreElements(),
),
)
)).pipe(
finalize(() => console.log('done')),
)
)
)
const fetchOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH),
mergeMap(({ payload }) => fetchOperationData(payload.processId))
)
const fetchOperationData = processId => {
api.fetchOperationsByProcess(processId).pipe(
// Note: We have to pass along the `processId` in the
// "success" action so that the inner observables in
// `fetchMultipleOperationsEpic` can complete:
mergeMap(operations => of(fetchSuccess({ processId }), fooAction(), barAction())),
catchError(error => of(fetchOperationsFailure({ processId, error })))
)
}
我试图在第一个史诗中映射多个动作,在第二个史诗中处理它们,然后在第一个史诗中调用回调。
这是我看到的流程:
- fetchMultipleOperationsEpic 监听 OPERATIONS_FETCH_ALL 和 创建一个 fetchOperations 操作流。
- 然后 fetchOperationsEpic 启动,使用 每个调度操作的 fetchOperationData。
- 每次操作的数据都取完后, fetchMultipleOperationsEpic 应该调用回调。
const fetchMultipleOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH_ALL),
switchMap(({ payload }) => {
const stream = of(...payload.ids.map(processId => fetchOperations({ processId })));
// PROBLEM HERE: 'done' is logged before actions from fetchOperationsEpic complete
return stream.pipe(finalize(() => console.log('done')))
})
)
const fetchOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH),
mergeMap(({ payload }) => fetchOperationData(payload.processId))
)
const fetchOperationData = processId => {
api.fetchOperationsByProcess(processId).pipe(
mergeMap(operations => of(fetchSuccess(), fooAction(), barAction())),
catchError(error => of(fetchOperationsFailure({ processId, error })))
)
}
也尝试过 forkJoin 但没有成功:
const fetchMultipleOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH_ALL),
switchMap(({ payload }) => {
const observables = [...payload.ids.map(processId => of(fetchOperations({ processId })))];
// Doesn't wait as well
return forkJoin(observables).pipe(
mergeMap(o => o),
finalize(() => console.log('done'))
)
})
)
事实上,我让它工作的唯一方法是避免调用 fetchOperations 操作(从而避免调用 fetchOperationsEpic),并直接调用 fetchOperationData:
const fetchMultipleOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH_ALL),
mergeMap(({ payload }) => {
const observables = [...payload.ids.map(id => fetchOperationData(id))];
return merge(...observables)
.pipe(finalize(() => console.log('done')))
})
)
很好。我只是想知道我最初的方法是否真的可行。
你认为你有一个 "problem here":
return stream.pipe(finalize(() => console.log('done')))
这实际上是正确的。 finalize
将 运行 紧接着 其正上方的 of
返回的操作列表 调度 到商店。那是因为您已经明确地将 finalize
链接到那个内部可观察对象。
您正在考虑的方法的问题是,一旦某个操作进入商店,它就不再在该特定可观察对象的范围内。您 不能 只是等待其他 epics 中特定操作的处理结束,因为最终,epics 永远不会真正结束。在严格的 RxJS 意义上,可观察对象永远不会发出 complete notification.
一般来说,我会建议研究使用单个史诗的解决方案(可能与您上一个示例的思路相同)。这使您可以简单地链接运算符,例如 finalize
,而无需跳过太多环节。您仍然可以重构您的代码,以便在您想要 share/duplicate 不同 epics.
您还可以研究依赖于 inter-epic 通过不同操作进行通信的解决方案。不过,这可能会变得棘手,具体取决于您的工作流程的复杂性。您需要每个操作来唯一标识 sub-processes。这是一个想法:
const fetchMultipleOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH_ALL),
switchMap(({ payload }) =>
merge(...payload.ids.map(processId =>
concat(
of(fetchOperations({ processId })),
// Note: This keeps the observable running until either
// the corresponding "success" or "failure" action is
// dispatched:
action$.pipe(
filter(action => (action.type === FETCH_SUCCESS || action.type === FETCH_FAILURE) && action.payload.processId === processId),
first(),
ignoreElements(),
),
)
)).pipe(
finalize(() => console.log('done')),
)
)
)
const fetchOperationsEpic = action$ => action$.pipe(
ofType(OPERATIONS_FETCH),
mergeMap(({ payload }) => fetchOperationData(payload.processId))
)
const fetchOperationData = processId => {
api.fetchOperationsByProcess(processId).pipe(
// Note: We have to pass along the `processId` in the
// "success" action so that the inner observables in
// `fetchMultipleOperationsEpic` can complete:
mergeMap(operations => of(fetchSuccess({ processId }), fooAction(), barAction())),
catchError(error => of(fetchOperationsFailure({ processId, error })))
)
}