在 rx-java2 中从 `ObservableEmitter` 转换为 `Observer`

Convert from `ObservableEmitter` to `Observer` in rx-java2

有没有简单的方法可以将 io.reactivex.ObservableEmitter<T> 转换为 io.reactivex.Observer<T>?我在 rx-java2 库中找不到执行此操作的函数。

实施似乎很简单:

 public static <T> Observer<T> toObserver(ObservableEmitter<T> oe) {
    return new Observer<T>() {

        @Override
        public void onSubscribe(Disposable d) {
            oe.setDisposable(d);
        }

        @Override
        public void onNext(T t) {
            oe.onNext(t);
        }

        @Override
        public void onError(Throwable e) {
            oe.onError(e);
        }

        @Override
        public void onComplete() {
            oe.onComplete();
        }
    };
}

但感觉它应该是标准库实现的一部分,因为它提供了 rx-java2 中两种核心类型之间的转换。

基本上我正在尝试将以下代码从 rxjava 1 迁移到 2

class X<T, O1, O2> implements Transformer<T, Either<O1, O2>> {

Transformer<T, O1> t1;
Transformer<T, O2> t2;

@Override
public Observable<Either<O1, O2>> call(Observable<T> input) {
    return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() {
        @Override
        public Observable<Either<O1, O2>> call(final T t) {
            return Observable.<Either<O1, O2>>create(new OnSubscribe<Either<O1, O2>>() {
                @Override
                public void call(final Subscriber<? super Either<O1, O2>> sub) {
                    t1.call(Observable.just(t)).map(o1 -> Either.<O1, O2>left(o1)).subscribe(sub);
                    t2.call(Observable.just(t)).map(o2 -> Either.<O1, O2>right(o2)).subscribe(sub);
                }
            });
        }
    });
}

}

请注意,OnSubscribe 提供了 Subscriber 接口,我可以使用该接口订阅另外两个 Observable's,需要 rxjava 2 转换。

看来您需要 publish(Function): (您的代码看起来很复杂,并且违反了 v1 中的协议)。

ObservableTransformer<T, O1> t1 = ...
ObservableTransformer<T, O2> t2 = ...

ObservableTransformer<T, Either<O1, O2>> combiner = o -> 
    o.publish(g -> Observable.merge(
        t1.apply(g).map(o1 -> Either.<O1, O2>left(o1)),
        t2.apply(g).map(o2 -> Either.<O1, O2>right(o2))
    ));

如果你真的想坚持使用外部 flatMap(以防内部异步),请使用 merge() 而不是 create:

return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() {
    @Override
    public Observable<Either<O1, O2>> call(final T t) {
        Observable<T> just = Observable.just(t);
        Observable.merge(
            t1.call(just).map(o1 -> Either.<O1, O2>left(o1)),
            t2.call(just).map(o2 -> Either.<O1, O2>right(o2))
        )
    }
});