延迟项目发射,直到项目从另一个可观察对象发射

Delay items emission until item is emitted from another observable

现在使用 RxJava 并偶然发现以下问题:

我有 2 个不同的流:

所以基本上我有项目流,我希望所有这些项目与第二个流中的单个项目组合:

----a1----a2----a3----a4----a5----|---------------- >

------------b1--|---------------------------- ------>

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

------------a1b1-a2b1-a3b1-a4b1-a5b1-------->

它看起来非常类似于 combileLatest 运算符,但 combineLatest 将忽略第一个流中的所有项目,除了最接近第二个流中的项目。这意味着我不会收到 a1b1 - 发出的第一个结果项目将是 a2b1.

我也看了delay operator, but it doesn't allow me to specify close stream like it is done with buffer运算符

有没有什么奇特的运算符可以解决上面的问题?

据我所知,没有内置运算符可以实现您描述的行为。您始终可以实现自定义运算符或在现有运算符之上构建它。我认为第二个选项更容易实现,这里是代码:

public static <L, R, T> Observable<T> zipper(final Observable<? extends L> left, final Observable<? extends R> right, final Func2<? super L, ? super R, ? extends T> function) {
    return Observable.defer(new Func0<Observable<T>>() {
        @Override
        public Observable<T> call() {
            final SerialSubscription subscription = new SerialSubscription();
            final ConnectableObservable<? extends R> cached = right.replay();

            return left.flatMap(new Func1<L, Observable<T>>() {
                @Override
                public Observable<T> call(final L valueLeft) {
                    return cached.map(new Func1<R, T>() {
                        @Override
                        public T call(final R valueRight) {
                            return function.call(valueLeft, valueRight);
                        }
                    });
                }
            }).doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.set(cached.connect());
                }
            }).doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.unsubscribe();
                }
            });
        }
    });
}

如果对代码有任何疑问,我可以详细解释。

更新

关于询问我的解决方案与以下解决方案有何不同:

left.flatMap(valueLeft -> right.map(valueRight -> together(valueLeft, valueRight)));
  1. 并行执行——在我的实现中,leftright observables 都是并行执行的。 right observable 不必等待 left 发出它的第一个项目。
  2. 缓存 - 我的解决方案只订阅一次 right observables 并缓存其结果。这就是为什么 b1 对于所有 aXXX 项目总是相同的。 提供的解决方案在每次 left 发出一个项目时订阅 rightobservable。这意味着:

    • 无法保证 b1 不会更改其值。例如,对于以下可观察对象,每个 a.

      都会得到不同的 b
      final Observable<Double> right = Observable.defer(new Func0<Observable<Double>>() {
          @Override
          public Observable<Double> call() {
              return Observable.just(Math.random());
          }
      });
      
    • 如果 right observable 是一个耗时的操作(例如网络调用),那么每次 left observable 发出一个新项目时你都必须等待它完成.

有几种方法可以做到这一点:

1) flatMap 超过 b 如果你不需要提前开始 a

b.flatMap(bv -> a.map(av -> together(av, bv)));

2) 当然,您可以 cache,但它会在整个直播过程中保留您的 a

3) 以不同寻常的方式使用 groupBy,因为它的 GroupedObservable 缓存值直到单个订阅者到达,重放缓存值并继续作为常规直接可观察值(让所有以前的缓存值消失)。

Observable<Long> source = Observable.timer(1000, 1000, TimeUnit.MILLISECONDS)
        .doOnNext(v -> System.out.println("Tick"))
        .take(10);
Observable<String> other = Observable.just("-b").delay(5000, TimeUnit.MILLISECONDS)
        .doOnNext(v -> System.out.println("Tack"))
        ;

source.groupBy(v -> 1)
.flatMap(g -> 
    other.flatMap(b -> g.map(a -> a + b))
).toBlocking().forEach(System.out::println);

它的工作原理如下:

  • 通过将源中的所有内容分组到第 1 组,抓住 GroupedObservable
  • 当小组 g 到达时,我们 'start observing' 另一个可观察者。
  • 一旦其他元素触发了它的元素,我们就获取它并将其映射到组上,'start observing' 它也一样,为我们带来 a + bs 的最终序列。

我添加了 doOnNexts 以便您可以看到源在 other 触发其 "Tack" 之前确实处于活动状态。