Observable 的扁平化 Observable

Flatten Observable of Observables

我想做的是创建一个每秒运行另一个函数的函数。第二个函数 returns Observables<A> 我希望第一个函数也 return Observables<A> 而不是 Observable<Observable<A>>

例如:

private A calcA(){
   ...
   return new A(...)
}

public Observable<A> getAs(){
   return Observable.create( subscriber -> {
      Bool condition = ...
      do {
         subscriber.onNext(calcA())
      } while (condition)
      subscriber.onComplete()
   })
}

public Observable<A> pollAs(){
   return Observable.create(subscriber -> {
      do {
         subscriber.onNext(getAs()) // Flatten here I guess
         Thread.sleep(1000)
      } while(true)
   })

所以我想做类似的事情(我试着用 Java-ish 的方式写这个,但我会使用 Kotlin)

我想到了这个解决方案:

public Observable<A> pollAs() {
   return Observable.create(subscriber -> {
       do {
           getAs().subscribe(
                   { subscriber.onNext(it) },
                   { subscriber.onError(it) },
                   { Thread.sleep(1000) }
           )
       } while (true)
   })
}

我不太喜欢这个,谁能告诉我一个更方便的方法?

您不需要使用 flatMap() 运算符来展平内部 observable,因为您只想重复订阅同一个 observable。

public Observable<A> getAs() {
   return Observable.fromCallable( () -> calcA() )
            .repeat()
            .takeWhile( v -> !condition( v );
}

getAs() 将发出物品,直到达到条件。然后它就会完成。

public Observable<A> pollAs(){
   return getAs()
            .repeatWhen( completed -> completed.delay(1000, TimeUnit.MILLISECONDS) );

pollAs() 将不断重新订阅 getAs() observable,在每次订阅之间暂停一秒钟。

编辑:我已经将一个持续 6 个月的示例上传到 https://pastebin.com/kSmi24GF 说明你要不断的把数据出来的时间提前。