奇怪的 takeUntil 在 rxjs 6 中表现

weird takeUntil behave in rxjs 6

首先,开始按钮停止,然后点击restore01按钮和restore02按钮,开始按钮恢复,

然后错误是当单击停止按钮 01 或停止按钮 02 时停止按钮应该停止,但不是

注意console.log,启动时开始按钮停止,然后你点击restore01和restore02,然后开始按钮工作,现在这个bug像冬天一样来了,当我点击stop01按钮或stop02按钮, 在 console.log 它显示停止流 Array [ "r", "" ] 但是开始按钮仍然有效,没有被停止,你能想象我看到这种情况的感觉吗?天哪

online demo

代码已失效

const startDom = document.querySelector("#start")
const numberDom = document.querySelector("#number")
const restoreDom01 = document.querySelector("#restore01")
const stopDom01 = document.querySelector("#stop01")
const restoreDom02 = document.querySelector("#restore02")
const stopDom02 = document.querySelector("#stop02")

let init01 = false
let init02 = false

const origin01$ = new rxjs.Subject()
const origin02$ = new rxjs.Subject()

const restore01$ = rxjs.fromEvent(restoreDom01, "click")
restore01$.subscribe(() => origin01$.next("r"))
const stop01$ = rxjs.fromEvent(stopDom01, "click")
stop01$.subscribe(() => origin01$.next(""))

const restore02$ = rxjs.fromEvent(restoreDom02, "click")
restore02$.subscribe(() => origin02$.next("r"))
const stop02$ = rxjs.fromEvent(stopDom02, "click")
stop02$.subscribe(() => origin02$.next(""))

const [stopReg$, right$] = rxjs
.combineLatest(origin01$, origin02$)
.pipe(rxjs.operators.partition(arr => arr.some(isEmpty)));
const restoreReg$ = right$.pipe(
  rxjs.operators.take(1),
  rxjs.operators.repeatWhen(() => stopReg$)
);


const start$ = rxjs.fromEvent(startDom, "click");
start$
  .pipe(
  rxjs.operators.mapTo(1),
  rxjs.operators.scan((acc, cur) => acc + cur, 0),
  rxjs.operators.takeUntil(stopReg$),
  rxjs.operators.repeatWhen(() => restoreReg$)
)
  .subscribe(x => (numberDom.innerHTML = x));


stopReg$.subscribe(x => console.log("stop stream", x));
restoreReg$.subscribe(x => console.log("restore stream", x));

origin01$.next("")
origin02$.next("")

function isEmpty(n) {
  return n === "";
}

可能不是很清楚,我说一下步骤

  1. 点击2个恢复按钮,一个一个

  2. 点击开始按钮,点击后数字会加一

  3. 点击停止按钮,OK,这是一个问题,现在开始按钮应该停止,但不是,你仍然可以添加一个点击开始按钮,但在控制台中你可以看到停止stream 已订阅,好奇怪

I also have other demo with the same problem using react and rxjs

您的问题可能与您使用 combineLatestSubject 的方式有关。

combineLatest(origin01$, origin02$)

其中origin01$origin02$都是Subject。 Subjects 不会自然地坚持旧的价值观来发射,他们本质上只是 "event emitters",他们会向订阅者广播他们的发射,仅此而已,在发射之后他们不会坚持它。

看起来当 origin01$ 开火时,combineLatest 必须等待另一个 Subject 再次开火才能发出信号。您的 stopReg$ 流可能没有像您预期的那样触发,请像这样用 tap 检查它:

rxjs.operators.repeatWhen(() => 
  stopReg$.pipe(
    rxjs.operators.tap(x => console.log('got to here'))
  )
)

BehaviorSubject 可以提供帮助。这是来自 RxJS docs:

的描述

A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to.

如果您将原点 Subject 改为 BehaviorSubject,您可能会得到您希望的预期结果。我试过这个:

const origin01$ = new rxjs.BehaviorSubject("")
const origin02$ = new rxjs.BehaviorSubject("")

在我只点击了一个 "stop" 按钮之后,我无法看到 "start" 扫描流在我点击它时继续对值求和。

我相信这是可行的,因为在 combineLatest 中,因为它是一个 BehaviorSubject,它已经能够获取最新的值,而不必等待另一个 Subject.