满足条件时暂停史诗,然后在条件完成时发出缓冲操作

Pause epic when criteria met, then emit buffered action when criteria complete

我正在开发一个应用程序,当用户在页面之间导航时,我会定期将信息保存到服务器。

我们目前通过安排一个 "persist" 动作来做到这一点,该动作在以 "persist_end" 动作结束之前传播一系列事件。目前,如果用户快速导航,这些分组操作会相互拦截,从而导致各种问题。我以为我可以缓冲开始动作并等到结束动作被执行。

我使用来自 Redux-Observables 站点的 ping-pong 示例创建了一个类似的示例:https://codepen.io/dualcyclone/pen/GOZRxW?editors=0011

const actionPauser = new BehaviorSubject(false);

const pingEpic = action$ =>
    action$.ofType(PING)
      .do(action => console.log(action)) // check if action caught by epic
      .buffer(actionPauser.asObservable().filter(paused => !paused))
      .do(eh => console.log('buffered? ',eh)) // check if buffered actions is occurring
      .do(() => actionPauser.next(true)) // tell pauser to pause
      .map((buf) => buf[buf.length-1])
      .filter(action => action !== undefined)
      .delay(1000)
      .mapTo({ type: PONG });

const pauseEpic = action$ =>
    action$.ofType(PONG)
      .delay(1000)
      .do(() => actionPauser.next(false)) // tell pauser to not pause
      .mapTo({ type: PING });

前提是相似的,我允许用户按他们喜欢的次数按 "start PING" 按钮,正在收听这个的史诗应该检查当前是否有 ping 操作(通过 "actionPauser" BehaviorSubject),并将任何操作排队,直到上一个 ping 操作完成。

史诗应该发出最近的缓冲动作,所以它过滤缓冲列表然后通过最新的。

我似乎无法理解的是 - 控制台日志指示在页面加载后立即触发了多少缓冲操作;这可能表明构建方式存在问题 - 我是否遗漏了什么?

听起来 concatMap 在这种情况下可能效果很好。

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

所以在下面的例子中,一次只有一个计时器是运行。在前一个仍在等待时传入的任何 PING 都将被缓冲。

const pingEpic = action$ =>
  action$.ofType(PING)
    .concatMap(() =>
      Observable.timer(1000)
        .mapTo({ type: PONG })
    );

https://jsbin.com/gocezut/edit?js,output

(rapidly click four times)
PING
PING
PING
PING
(1000ms pass)
PONG
(1000ms pass)
PONG
(1000ms pass)
PONG
(1000ms pass)
PONG

请记住,大多数 redux-observable 问题都可以重新构造为常规 RxJS 问题,从而拓宽资源范围并帮助您找到。这就是 redux-observable 的美妙之处:它几乎完全只是常规的 RxJS 模式。

因此,虽然操作的输出并不完全令人满意(考虑到开始操作是由用户事件发出的,我对此无能为力),但 Cartant 推荐的建议实际上完全符合我的需要.

Audit:

Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.

本质上,这让我可以忽略多个发出的 'PING' 事件,而其中一个事件正在发生。然后它会继续执行最近的'PING'事件,所以我们看到的输出如下:

(click) PING (click) PING (click) PING (click) PING PONG DONE PONG DONE

第一个和最后一个 'PING' 动作是唯一通过 Epic 传播的动作,所以我们看到两个最终的 PONG 动作,都跟在一个 DONE 动作之后。

所以,这是回答的例子(在我的代码笔上也看到了 here

const pingEpic = action$ =>
  action$.ofType(PING)
    .audit(() => actionPauser.filter(paused => !paused))
    .do(() => actionPauser.next(true))
    .delay(1000)
    .mapTo({ type: PONG });

const pauseEpic = action$ =>
  action$.ofType(PONG)
    .delay(1000)
    .mapTo({ type: DONE })
    .do(() => actionPauser.next(false));