如何重置 BehaviorSubject

How to reset a BehaviorSubject

我有一个 BehaviorSubject 我想重置 - 我的意思是我希望最新的值不可用,就好像它是刚刚创建的一样。

我似乎没有看到 API 可以做到这一点,但我想还有另一种方法可以达到相同的结果?

我想要的行为是我需要发出事件,并且我希望订阅者在订阅时获得最新事件 - 如果特定管理器处于 'started' 状态。但是当这个管理器是 'stopped' 时,最新的事件不应该可用(就像它从来没有开始过一样)。

我假设您想清除 BehaviorSubject(否则不要对其调用 onComplete)。这不受支持,但您可以通过让消费者忽略当前值来实现类似的效果:

public static final Object EMPTY = new Object();

BehaviorSubject<Object> subject = BehaviorSubject.createDefault(EMPTY);

Observable<YourType> obs = subject.filter(v -> v != EMPTY).cast(YourType.class);

obs.subscribe(System.out::println);

// send normal data
subject.onNext(1);
subject.onNext(2);

// clear the subject
subject.onNext(EMPTY);

// this should not print anything
obs.subscribe(System.out::println);

另一种打开和关闭 observable 值的方法是使用 switchMap() 在实际 observable 和空值之间切换。

假设您有一个管理器对象,并且它有一个显示其状态的可观察对象。那么,

subjectObservable = manager.getStateObservable()
  .switchMap( state -> state == ON ? subject : Observable.never() );

只会在 manager 处于 ON 状态时发出值。

对于某些情况,我找到了更好的解决方案,并且是:

subject.skiplast(1)

它可以清除流中由于BehaviorSubject"behavior"

而保留的最后一个位置

@akarnokd 的回答的一个问题是 .cast 阻止 YourType 成为接口或通用类型,例如 List<String>.

另一种选择是在您可以打开和关闭的布尔字段上进行过滤。

    private BehaviorSubject<PandoraApp> subject = BehaviorSubject.create();
    private boolean enabled = true;

    Observable<PandoraApp> observable = subject.filter(v -> enabled);

如果在不同的线程上调用方法,您可以使用 AtomicBoolean 作为过滤器标志。

像这样使用setTimeout:

setOtpoint(value) {

    this._setOption.next(value);

    // Clear BehaviorSubject after emit value

    setTimeout(() => {
      this._setOption.next(null);
    }, 100);

  }

这是我的库:​​

implementation "com.github.kolyall:rxjava2-empty:1.0.36"

示例:


private val myBehaviorSubject = BehaviorSubjectOptional.createOptional<MyItem?>()

errorBehaviorSubject.toObservable()
.subscribe{ item-> Log.d("onNext1", "item = $item")}

var item:MyItem? = MyItem()
myBehaviorSubject.onNextOptional(item)

//For reset:
myBehaviorSubject.clear()
//OR
item = null
myBehaviorSubject.onNextOptional(item)


errorBehaviorSubject.toObservable()
.subscribe{ item-> Log.d("onNext2", "item = $item")}