RxJs:在所有取消订阅时中止延迟和共享的可观察对象
RxJs: Abort a deferred and shared observable when all unsubscribe
我想创建一个运行长轮询操作的 rxjs
Observable
。
每次迭代都会发出中间结果。
当 isComplete
returns 为真时,Observable
完成。
此函数应按如下方式运行
- 它应该只有在 至少一个订阅者
时才会开始
- 它应该允许多个订阅者共享结果
- 如果没有订阅者,它应该中止轮询并取消呼叫
以下代码可以正常运行并满足条件(1)和(2):
function longPollingAction(fetch: () => Promise<Response>, cancel: () => {}): Observable<Response> {
return defer(() => { // defer to start running when there's a single subscriber
return from(fetch()).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
);
}).pipe(share()); // share to allow multiple subscribers
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
如何修改此代码以满足 (3) 的要求?在当前的实现中,轮询停止了,但是我该如何调用 cancel
?
使用定稿
您可以使用 finalize
调用取消。这可能是这样的:
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
finalize(cancel),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
回调 complete
tap 操作员可以访问 next
、error
和 complete
发射。对于 callback: () => void
,这已经足够了。
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
tap({
complete: cancel
}),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
回调 unsubscribe
我认为不存在这样的运算符,但我们可以很容易地创建一个。此运算符只会在取消订阅时触发回调。它会忽略 error
和 complete
.
function onUnsubscribe<T>(
fn: () => void
): MonoTypeOperatorFunction<T> {
return s => new Observable(observer => {
const bindOn = name => observer[name].bind(observer);
const sub = s.subscribe({
next: bindOn("next"),
error: bindOn("error"),
complete: bindOn("complete")
});
return {
unsubscribe: () => {
fn();
sub.unsubscribe()
}
};
});
}
那么你可以这样使用它:
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
onUnsubscribe(cancel),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
由于 share
正在管理您的订阅并且共享只会退订一次 refCount < 1
,因此在这种情况下调用取消的唯一方法是没有订阅者。
给猫剥皮的方法不止一种,但我会这样做:
const onUnsubscribe = (callback: () => void) => <T>(source$: Observable<T>) =>
new Observable<T>(observer => {
let isSourceDone = false;
const subscription = source$.subscribe(
val => {
observer.next(val);
},
e => {
isSourceDone = true;
observer.error(e);
},
() => {
isSourceDone = true;
observer.complete();
}
);
return () => {
if (isSourceDone) return;
callback();
subscription.unsubscribe();
};
});
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => {}
): Observable<Response> {
const lazyFetch$ = defer(() => fetch());
return lazyFetch$.pipe(
expand(() => timer(1000).pipe(mergeMapTo(lazyFetch$))),
takeWhile<Response>(isComplete, false),
onUnsubscribe(cancel),
share()
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
我想创建一个运行长轮询操作的 rxjs
Observable
。
每次迭代都会发出中间结果。
当 isComplete
returns 为真时,Observable
完成。
此函数应按如下方式运行
- 它应该只有在 至少一个订阅者 时才会开始
- 它应该允许多个订阅者共享结果
- 如果没有订阅者,它应该中止轮询并取消呼叫
以下代码可以正常运行并满足条件(1)和(2):
function longPollingAction(fetch: () => Promise<Response>, cancel: () => {}): Observable<Response> {
return defer(() => { // defer to start running when there's a single subscriber
return from(fetch()).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
);
}).pipe(share()); // share to allow multiple subscribers
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
如何修改此代码以满足 (3) 的要求?在当前的实现中,轮询停止了,但是我该如何调用 cancel
?
使用定稿
您可以使用 finalize
调用取消。这可能是这样的:
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
finalize(cancel),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
回调 complete
tap 操作员可以访问 next
、error
和 complete
发射。对于 callback: () => void
,这已经足够了。
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
tap({
complete: cancel
}),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
回调 unsubscribe
我认为不存在这样的运算符,但我们可以很容易地创建一个。此运算符只会在取消订阅时触发回调。它会忽略 error
和 complete
.
function onUnsubscribe<T>(
fn: () => void
): MonoTypeOperatorFunction<T> {
return s => new Observable(observer => {
const bindOn = name => observer[name].bind(observer);
const sub = s.subscribe({
next: bindOn("next"),
error: bindOn("error"),
complete: bindOn("complete")
});
return {
unsubscribe: () => {
fn();
sub.unsubscribe()
}
};
});
}
那么你可以这样使用它:
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => void
): Observable<Response> {
// defer to turn eager promise into lazy observable
return defer(fetch).pipe(
expand(() => timer(1000).pipe(switchMap(fetch))),
takeWhile<Response>(isComplete, false),
onUnsubscribe(cancel),
share() // share to allow multiple subscribers
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}
由于 share
正在管理您的订阅并且共享只会退订一次 refCount < 1
,因此在这种情况下调用取消的唯一方法是没有订阅者。
给猫剥皮的方法不止一种,但我会这样做:
const onUnsubscribe = (callback: () => void) => <T>(source$: Observable<T>) =>
new Observable<T>(observer => {
let isSourceDone = false;
const subscription = source$.subscribe(
val => {
observer.next(val);
},
e => {
isSourceDone = true;
observer.error(e);
},
() => {
isSourceDone = true;
observer.complete();
}
);
return () => {
if (isSourceDone) return;
callback();
subscription.unsubscribe();
};
});
function longPollingAction(
fetch: () => Promise<Response>,
cancel: () => {}
): Observable<Response> {
const lazyFetch$ = defer(() => fetch());
return lazyFetch$.pipe(
expand(() => timer(1000).pipe(mergeMapTo(lazyFetch$))),
takeWhile<Response>(isComplete, false),
onUnsubscribe(cancel),
share()
);
}
function isComplete(r: Response): boolean {
// returns true if r is complete.
}