在可观察的取消上执行函数
Execute function on observable cancellation
我想要一个可观察的对象,当取消订阅时它会调用一个函数,但只有当它被取消订阅时没有错误并且没有完成。我尝试构建的 Observable 通常会与另一个 Observable 竞争。我想要当另一个 observable "wins" 这个执行一个函数。
我尝试了 finalize 运算符,但它总是执行。
playback.ts
import { timer } from "rxjs";
import { takeUntil, finalize } from "rxjs/operators";
import errorobs$ from "./errorobs";
export default function() {
return timer(10000).pipe(
takeUntil(errorobs$),
finalize(finalFunc)
);
}
function finalFunc() {
console.log("final function executed");
}
errorobs.ts
import { fromEvent } from "rxjs";
import { map } from "rxjs/operators";
export default fromEvent(document.getElementById("errorBtn"), "click").pipe(
map(() => {
throw new Error("my error");
})
);
这里做了个小demohttps://codesandbox.io/s/q7pwowm4l6
点击开始开始"the observable"。
点击取消使另一个可观察者获胜
点击错误产生错误
实现此目的的一种方法是使用自定义运算符,例如下面我的 onCancel()
:
const {Observable} = rxjs
function onCancel(f) {
return observable => new Observable(observer => {
let completed = false
let errored = false
const subscription = observable.subscribe({
next: v => observer.next(v),
error: e => {
errored = true
observer.error(e)
},
complete: () => {
completed = true
observer.complete()
}
})
return () => {
subscription.unsubscribe()
if (!completed && !errored) f()
}
})
}
// Test:
const {interval} = rxjs
const {take} = rxjs.operators
// This one gets cancelled:
const s = interval(200).pipe(
onCancel(() => console.warn('s cancelled!'))
).subscribe(() => {})
setTimeout(() => s.unsubscribe(), 500)
// This one completes before unsubscribe():
const q = interval(200).pipe(
take(2),
onCancel(() => console.warn('q cancelled!'))
).subscribe(() => {})
setTimeout(() => q.unsubscribe(), 500)
<script src="//unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
确实如您描述的那样有效。 finalize
在处理链时执行,即所有订阅者取消订阅时、链出错时或完成时执行。
此功能的 RxJS Github 页面上已经存在问题:https://github.com/ReactiveX/rxjs/issues/2823
在上面的 link 中,您可以看到一个自定义运算符的示例,它向 finalize
运算符添加了原因。
我不得不自己处理这个用例,并将这个运算符添加到我自己的 RxJS 运算符集合中:https://github.com/martinsik/rxjs-extra/blob/master/doc/finalizeWithReason.md
我想要一个可观察的对象,当取消订阅时它会调用一个函数,但只有当它被取消订阅时没有错误并且没有完成。我尝试构建的 Observable 通常会与另一个 Observable 竞争。我想要当另一个 observable "wins" 这个执行一个函数。
我尝试了 finalize 运算符,但它总是执行。
playback.ts
import { timer } from "rxjs";
import { takeUntil, finalize } from "rxjs/operators";
import errorobs$ from "./errorobs";
export default function() {
return timer(10000).pipe(
takeUntil(errorobs$),
finalize(finalFunc)
);
}
function finalFunc() {
console.log("final function executed");
}
errorobs.ts
import { fromEvent } from "rxjs";
import { map } from "rxjs/operators";
export default fromEvent(document.getElementById("errorBtn"), "click").pipe(
map(() => {
throw new Error("my error");
})
);
这里做了个小demohttps://codesandbox.io/s/q7pwowm4l6
点击开始开始"the observable"。
点击取消使另一个可观察者获胜
点击错误产生错误
实现此目的的一种方法是使用自定义运算符,例如下面我的 onCancel()
:
const {Observable} = rxjs
function onCancel(f) {
return observable => new Observable(observer => {
let completed = false
let errored = false
const subscription = observable.subscribe({
next: v => observer.next(v),
error: e => {
errored = true
observer.error(e)
},
complete: () => {
completed = true
observer.complete()
}
})
return () => {
subscription.unsubscribe()
if (!completed && !errored) f()
}
})
}
// Test:
const {interval} = rxjs
const {take} = rxjs.operators
// This one gets cancelled:
const s = interval(200).pipe(
onCancel(() => console.warn('s cancelled!'))
).subscribe(() => {})
setTimeout(() => s.unsubscribe(), 500)
// This one completes before unsubscribe():
const q = interval(200).pipe(
take(2),
onCancel(() => console.warn('q cancelled!'))
).subscribe(() => {})
setTimeout(() => q.unsubscribe(), 500)
<script src="//unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
确实如您描述的那样有效。 finalize
在处理链时执行,即所有订阅者取消订阅时、链出错时或完成时执行。
此功能的 RxJS Github 页面上已经存在问题:https://github.com/ReactiveX/rxjs/issues/2823
在上面的 link 中,您可以看到一个自定义运算符的示例,它向 finalize
运算符添加了原因。
我不得不自己处理这个用例,并将这个运算符添加到我自己的 RxJS 运算符集合中:https://github.com/martinsik/rxjs-extra/blob/master/doc/finalizeWithReason.md