RxJS:takeUntil 有多个动作和不同的过滤器?
RxJS: takeUntil with multiple actions and different filters?
我有一个要继续执行的 Observable,直到:
1) uploadActions.MARK_UPLOAD_AS_COMPLETE
动作是用特定的负载调用的
或
2) 使用任何负载调用 uploadActions.UPLOAD_FAILURE
操作
这是我所能得到的(但行不通):
return Observable.interval(5000)
.takeUntil(
action$
.ofType(
uploadActions.UPLOAD_FAILURE,
uploadActions.MARK_UPLOAD_AS_COMPLETE
)
.filter(a => { // <---- this filter only applies to uploadActions.MARK_UPLOAD_AS_COMPLETE
const completedFileHandle = a.payload;
return handle === completedFileHandle;
})
)
.mergeMap(action =>
...
);
有没有一种干净的方法可以实现这个目标?
我会把这两个条件分成不同的流,然后像这样合并它们:
const action$ = new Rx.Subject();
const uploadActions = {
UPLOAD_FAILURE: "UPLOAD_FAILURE",
MARK_UPLOAD_AS_COMPLETE: "MARK_UPLOAD_AS_COMPLETE"
};
const handle = 42;
window.setTimeout(() => action$.next({
type: uploadActions.MARK_UPLOAD_AS_COMPLETE,
payload: handle
}), 1200);
Rx.Observable.interval(500)
.takeUntil(
Rx.Observable.merge(
action$.filter(x => x.type === uploadActions.UPLOAD_FAILURE),
action$.filter(x => x.type === uploadActions.MARK_UPLOAD_AS_COMPLETE)
.filter(x => x.payload === handle)
)
).subscribe(
x => { console.log('Next: ', x); },
e => { console.log('Error: ', e); },
() => { console.log('Completed'); }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>
例如,我必须使用 filter
运算符而不是 ofType
,因为 ofType
是 redux 的东西。
我有一个要继续执行的 Observable,直到:
1) uploadActions.MARK_UPLOAD_AS_COMPLETE
动作是用特定的负载调用的
或
2) 使用任何负载调用 uploadActions.UPLOAD_FAILURE
操作
这是我所能得到的(但行不通):
return Observable.interval(5000)
.takeUntil(
action$
.ofType(
uploadActions.UPLOAD_FAILURE,
uploadActions.MARK_UPLOAD_AS_COMPLETE
)
.filter(a => { // <---- this filter only applies to uploadActions.MARK_UPLOAD_AS_COMPLETE
const completedFileHandle = a.payload;
return handle === completedFileHandle;
})
)
.mergeMap(action =>
...
);
有没有一种干净的方法可以实现这个目标?
我会把这两个条件分成不同的流,然后像这样合并它们:
const action$ = new Rx.Subject();
const uploadActions = {
UPLOAD_FAILURE: "UPLOAD_FAILURE",
MARK_UPLOAD_AS_COMPLETE: "MARK_UPLOAD_AS_COMPLETE"
};
const handle = 42;
window.setTimeout(() => action$.next({
type: uploadActions.MARK_UPLOAD_AS_COMPLETE,
payload: handle
}), 1200);
Rx.Observable.interval(500)
.takeUntil(
Rx.Observable.merge(
action$.filter(x => x.type === uploadActions.UPLOAD_FAILURE),
action$.filter(x => x.type === uploadActions.MARK_UPLOAD_AS_COMPLETE)
.filter(x => x.payload === handle)
)
).subscribe(
x => { console.log('Next: ', x); },
e => { console.log('Error: ', e); },
() => { console.log('Completed'); }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>
例如,我必须使用 filter
运算符而不是 ofType
,因为 ofType
是 redux 的东西。