RxJS - 使具有重置无状态的计数器?

RxJS - make counter with reset stateless?

假设我有以下标记:

<button id="dec">-</button>
<output id="out">0</output>
<button id="inc">+</button>
<button id="res">RESET</button>

以及以下 Rx.js 脚本:

var total = 0

Rx.Observable.merge(
    // decrement
    Rx.Observable.fromEvent($('#dec'), 'click')
    .map(function() { return -1 }),

    // increment
    Rx.Observable.fromEvent($('#inc'), 'click')
    .map(function() { return +1 }),

    // reset
    Rx.Observable.fromEvent($('#res'), 'click')
    .map(function() { return -total })
) // merge
.forEach(function(delta) {
    total += delta
    $('#out').text(total)
})

一切正常。单击 +/- increments/decrements 计数器,然后单击 'RESET' 将其重置为零,但是...我在顶部有变量 'total'。这就是状态,如果我认同函数式反应式编程的价值观,它就是邪恶的,不是吗?如果是这样,我该如何补救?如果我没有重置按钮,我可以只使用 scan(seed, accumulator),但是重置按钮的功能让我陷入了循环,至于如何做到这一点 'stateless'.

Working fiddle here.

我看到有两种解决方法。

首先,没有任何内容表明您不能在管道中增加数据:

Rx.Observable.merge(
  // decrement
  Rx.Observable.fromEvent($('#dec'), 'click')
    .map(function() { return {delta : -1}; }),

  // increment
  Rx.Observable.fromEvent($('#inc'), 'click')
    .map(function() { return {delta : +1}; }),

  // reset
  Rx.Observable.fromEvent($('#res'), 'click')
    .map(function() { return {reset : true}; })
) // merge
.scan(0, function(acc, value) {
  return value.reset ? 0 : acc + value.delta;
})
.forEach(function(delta) {
  $('#out').text(delta)
});

上面的内容让你可以通过添加一个字段向下游发出流已被重置的信号(注意:我作弊了,为了可读性你可能想要添加 reset : false 而不是依赖虚假,但它是由你决定。

或者,如果您将 reset 视为实际重置流,那么您可以改为使用 flatMapLatest 来包装递增和递减:

Rx.Observable.fromEvent($('#res'), 'click')
.startWith(null)
.flatMapLatest(function(e) {
  return Rx.Observable.merge(
    // decrement
    Rx.Observable.fromEvent($('#dec'), 'click')
      .map(function() { return -1 }),

    // increment
    Rx.Observable.fromEvent($('#inc'), 'click')
      .map(function() { return +1 })
  )
  .scan(0, function(acc, delta) { return acc + delta })
  .startWith(0);
})
.subscribe(function(value) {
   $('#out').text(value) 
});

这使得流比包含两个 .startWith 来启动相应序列所必须的更加混乱,但是如果您反对扩充并希望状态隐式控制流然后这将是一种方法。

@Jrop 我喜欢这个运算符 Rx.Observable.when. With this one you can reproduce Bacon.update very easy. This is my code and jsbin 示例:

const {when, fromEvent} = Rx.Observable;

const decObs = fromEvent(document.getElementById('dec'), 'click');
const incObs = fromEvent(document.getElementById('inc'), 'click');
const resetObs = fromEvent(document.getElementById('res'), 'click');

when(
    decObs.thenDo(_ => prev => prev - 1),
    incObs.thenDo(_ => prev => prev + 1),
    resetObs.thenDo(_ => prev => 0)
).startWith(0).scan((prev, f) => f(prev))
.subscribe(v => document.getElementById('out').innerHTML = v);

再看看这个就更好了Join-calculus, New Release and Joins and this Combining sequences

根据@paulpdaniels 的回答,这是我在 Ramda 中使用的内容:

var hardSet  = Rx.Observable.fromEvent($('#set');
var decRes   = Rx.Observable.fromEvent($('#dec');
var incRes   = Rx.Observable.fromEvent($('#inc');

Rx.Observable.merge(
  incRes.map(function()  { return R.add(1); }),
  decRes.map(function()  { return R.add(-1); }),
  hardSet.map(function() { return R.always(0); })
).scan(function(prev, f) { 
  return f(prev); 
}, 0);

RxJs v6 的实现方式(重置 + 输入以更改步骤)如下:

const { fromEvent, merge } = rxjs;
const { map, mapTo, startWith, scan, withLatestFrom } = rxjs.operators;

const resultEl = document.getElementById("js-result"),
  stepInpEl = document.getElementById("js-step-inp"),
  btnDecEl = document.getElementById("js-btn-dec"),
  btnIncEl = document.getElementById("js-btn-inc"),
  btnReset = document.getElementById("js-btn-reset");

// observables (5)
const step$ = fromEvent(stepInpEl, "input").pipe(
  startWith({ target: { value: 5 } }),
  map(e => Number(e.target.value))
);

const inc$ = fromEvent(btnIncEl, "click").pipe(
  withLatestFrom(step$),
  map(([e, step]) => step )
);

const dec$ = fromEvent(btnDecEl, "click").pipe(
  withLatestFrom(step$),
  map(([e, step]) => -step )
);

const reset$ = fromEvent(btnReset, "click").pipe(
  mapTo( 0 )
);

const counter$ = merge(dec$, inc$, reset$).pipe(
  startWith( 0 ),
  scan((acc, value) => value && acc + value)
);

// subscriptions (2)
counter$.subscribe(val => resultEl.innerHTML = val);
step$.subscribe(val => stepInpEl.value = val);

标记:

<h2>RxJS (v6) counter</h2>
<em>5 observables, 2 subscriptions</em>
<h3>Result: <span id="js-result"></span></h3>

<button id="js-btn-reset">Reset</button>

<input type="number" id="js-step-inp" class="stepInp">

<button id="js-btn-dec">Decrement</button>

<button id="js-btn-inc">Increment</button>

codepen

上的实例