在 rx-java2 中从 `ObservableEmitter` 转换为 `Observer`
Convert from `ObservableEmitter` to `Observer` in rx-java2
有没有简单的方法可以将 io.reactivex.ObservableEmitter<T>
转换为 io.reactivex.Observer<T>
?我在 rx-java2
库中找不到执行此操作的函数。
实施似乎很简单:
public static <T> Observer<T> toObserver(ObservableEmitter<T> oe) {
return new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
oe.setDisposable(d);
}
@Override
public void onNext(T t) {
oe.onNext(t);
}
@Override
public void onError(Throwable e) {
oe.onError(e);
}
@Override
public void onComplete() {
oe.onComplete();
}
};
}
但感觉它应该是标准库实现的一部分,因为它提供了 rx-java2
中两种核心类型之间的转换。
基本上我正在尝试将以下代码从 rxjava 1 迁移到 2
class X<T, O1, O2> implements Transformer<T, Either<O1, O2>> {
Transformer<T, O1> t1;
Transformer<T, O2> t2;
@Override
public Observable<Either<O1, O2>> call(Observable<T> input) {
return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() {
@Override
public Observable<Either<O1, O2>> call(final T t) {
return Observable.<Either<O1, O2>>create(new OnSubscribe<Either<O1, O2>>() {
@Override
public void call(final Subscriber<? super Either<O1, O2>> sub) {
t1.call(Observable.just(t)).map(o1 -> Either.<O1, O2>left(o1)).subscribe(sub);
t2.call(Observable.just(t)).map(o2 -> Either.<O1, O2>right(o2)).subscribe(sub);
}
});
}
});
}
}
请注意,OnSubscribe
提供了 Subscriber
接口,我可以使用该接口订阅另外两个 Observable's
,需要 rxjava 2 转换。
看来您需要 publish(Function)
:
(您的代码看起来很复杂,并且违反了 v1 中的协议)。
ObservableTransformer<T, O1> t1 = ...
ObservableTransformer<T, O2> t2 = ...
ObservableTransformer<T, Either<O1, O2>> combiner = o ->
o.publish(g -> Observable.merge(
t1.apply(g).map(o1 -> Either.<O1, O2>left(o1)),
t2.apply(g).map(o2 -> Either.<O1, O2>right(o2))
));
如果你真的想坚持使用外部 flatMap
(以防内部异步),请使用 merge()
而不是 create
:
return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() {
@Override
public Observable<Either<O1, O2>> call(final T t) {
Observable<T> just = Observable.just(t);
Observable.merge(
t1.call(just).map(o1 -> Either.<O1, O2>left(o1)),
t2.call(just).map(o2 -> Either.<O1, O2>right(o2))
)
}
});
有没有简单的方法可以将 io.reactivex.ObservableEmitter<T>
转换为 io.reactivex.Observer<T>
?我在 rx-java2
库中找不到执行此操作的函数。
实施似乎很简单:
public static <T> Observer<T> toObserver(ObservableEmitter<T> oe) {
return new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
oe.setDisposable(d);
}
@Override
public void onNext(T t) {
oe.onNext(t);
}
@Override
public void onError(Throwable e) {
oe.onError(e);
}
@Override
public void onComplete() {
oe.onComplete();
}
};
}
但感觉它应该是标准库实现的一部分,因为它提供了 rx-java2
中两种核心类型之间的转换。
基本上我正在尝试将以下代码从 rxjava 1 迁移到 2
class X<T, O1, O2> implements Transformer<T, Either<O1, O2>> {
Transformer<T, O1> t1;
Transformer<T, O2> t2;
@Override
public Observable<Either<O1, O2>> call(Observable<T> input) {
return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() {
@Override
public Observable<Either<O1, O2>> call(final T t) {
return Observable.<Either<O1, O2>>create(new OnSubscribe<Either<O1, O2>>() {
@Override
public void call(final Subscriber<? super Either<O1, O2>> sub) {
t1.call(Observable.just(t)).map(o1 -> Either.<O1, O2>left(o1)).subscribe(sub);
t2.call(Observable.just(t)).map(o2 -> Either.<O1, O2>right(o2)).subscribe(sub);
}
});
}
});
}
}
请注意,OnSubscribe
提供了 Subscriber
接口,我可以使用该接口订阅另外两个 Observable's
,需要 rxjava 2 转换。
看来您需要 publish(Function)
:
(您的代码看起来很复杂,并且违反了 v1 中的协议)。
ObservableTransformer<T, O1> t1 = ...
ObservableTransformer<T, O2> t2 = ...
ObservableTransformer<T, Either<O1, O2>> combiner = o ->
o.publish(g -> Observable.merge(
t1.apply(g).map(o1 -> Either.<O1, O2>left(o1)),
t2.apply(g).map(o2 -> Either.<O1, O2>right(o2))
));
如果你真的想坚持使用外部 flatMap
(以防内部异步),请使用 merge()
而不是 create
:
return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() {
@Override
public Observable<Either<O1, O2>> call(final T t) {
Observable<T> just = Observable.just(t);
Observable.merge(
t1.call(just).map(o1 -> Either.<O1, O2>left(o1)),
t2.call(just).map(o2 -> Either.<O1, O2>right(o2))
)
}
});