在可观察的取消上执行函数

Execute function on observable cancellation

我想要一个可观察的对象,当取消订阅时它会调用一个函数,但只有当它被取消订阅时没有错误并且没有完成。我尝试构建的 Observable 通常会与另一个 Observable 竞争。我想要当另一个 observable "wins" 这个执行一个函数。

我尝试了 finalize 运算符,但它总是执行。

playback.ts

import { timer } from "rxjs";
import { takeUntil, finalize } from "rxjs/operators";
import errorobs$ from "./errorobs";

export default function() {
  return timer(10000).pipe(
    takeUntil(errorobs$),
    finalize(finalFunc)
  );
}

function finalFunc() {
  console.log("final function executed");
}

errorobs.ts

import { fromEvent } from "rxjs";
import { map } from "rxjs/operators";

export default fromEvent(document.getElementById("errorBtn"), "click").pipe(
  map(() => {
    throw new Error("my error");
  })
);

这里做了个小demohttps://codesandbox.io/s/q7pwowm4l6

点击开始开始"the observable"。

点击取消使另一个可观察者获胜

点击错误产生错误

实现此目的的一种方法是使用自定义运算符,例如下面我的 onCancel()

const {Observable} = rxjs

function onCancel(f) {
  return observable => new Observable(observer => {
    let completed = false
    let errored = false
    const subscription = observable.subscribe({
      next: v => observer.next(v),
      error: e => {
        errored = true
        observer.error(e)
      },
      complete: () => {
        completed = true
        observer.complete()
      }
    })
    return () => {
      subscription.unsubscribe()
      if (!completed && !errored) f()
    }
  })
}

// Test:
const {interval} = rxjs
const {take} = rxjs.operators

// This one gets cancelled:
const s = interval(200).pipe(
  onCancel(() => console.warn('s cancelled!'))
).subscribe(() => {})
setTimeout(() => s.unsubscribe(), 500) 

// This one completes before unsubscribe():
const q = interval(200).pipe(
  take(2),
  onCancel(() => console.warn('q cancelled!'))
).subscribe(() => {})
setTimeout(() => q.unsubscribe(), 500)
<script src="//unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

确实如您描述的那样有效。 finalize 在处理链时执行,即所有订阅者取消订阅时、链出错时或完成时执行。

此功能的 RxJS Github 页面上已经存在问题:https://github.com/ReactiveX/rxjs/issues/2823

在上面的 link 中,您可以看到一个自定义运算符的示例,它向 finalize 运算符添加了原因。

我不得不自己处理这个用例,并将这个运算符添加到我自己的 RxJS 运算符集合中:https://github.com/martinsik/rxjs-extra/blob/master/doc/finalizeWithReason.md