使用 RxJava 的状态机?

State machine using RxJava?

我正在尝试全力投入 RxJava 并解决我遇到的这个问题,但它似乎非常不适合它,因为 RxJava 似乎不想处理任何类型的状态,而只是传递沿着事件并改变它们来处理它们。

我试图用 RxJava 模拟的基本状态机行为是这样的:

  1. 在应用启动事件中,等待下一个应用暂停。
  2. 在应用暂停事件中,启动 15 分钟计时器并等待下一个应用恢复。
    • 如果应用程序在计时器内恢复,请取消它并返回到步骤 1。
  3. 在应用程序恢复事件中,如果 15 分钟计时器已用完,请刷新并返回步骤 1。

虽然可以在 RxJava 中实现状态机,但它很快就会变得非常丑陋。可以使用switchMap()运算符来select一些分支路径,也可以使用其他几个运算符进行分支合并。

大部分问题是状态机转换看起来更像 go to 而不是结构化编程,而且它们根本不像函数式编程。

出于类似的原因,很难创建任何非平凡状态机的流畅或功能描述。有通用状态机的RxJava实现RxFsm,看起来挺能干的。您将提供定时器作为外部输入,以及其他类似的解决方法。

这是实现状态机的代码,使用了一些 RxJava 元素,并做了一些简化假设:

Observable<AppState> appStateObservable;
AtomicReference<Subscription> timerSubscription = new AtomicReference<>(Subscriptions.empty());

appStateObservable
  .doOnNext( state -> if ( state == START ) doStartup() )
  .filter( state -> state != START )
  .subscribe( state -> if ( state == PAUSE ) {
      timerSubscription.set( Observable.timer(15, MINUTES)
        .subscribe( timerSubscription.get().unsubscribe() ) );
    } else  if ( timerSubscription.get().isUnsubscribed() ) {
      refresh();
    } else {
      timerSubscription.get().unsubscribe();
    } );

它使用 timerSubscription 订阅状态​​来确定计时器是否已触发,并因此执行刷新。