Subscribewith VS subscribe in RxJava2(Android)?
Subscribewith Vs subscribe in RxJava2(Android)?
什么时候调用 subscribeWith 方法而不是简单的订阅?用例是什么?
compositeDisposable.add(get()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(this::handleResponse, this::handleError));
VS
compositeDisposable.add(get()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
// .subscribe(this::handleResponse, this::handleError);
.subscribeWith(new DisposableObserver<News>() {
@Override public void onNext(News value) {
handleResponse(value);
}
@Override public void onError(Throwable e) {
handleError(e);
}
@Override public void onComplete() {
// dispose here ? why? when the whole thing will get disposed later
//via compositeDisposable.dispose(); in onDestroy();
}
}));
谢谢
稍后添加
根据文档,return 个一次性 SingleObserver 实例:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
subscribe(s);
return s;
}
其中 ConsumerSingleObserver class 实现了 SingleObserver 和 Disposable。
Observable#订阅说明:
在您的第一个代码片段中:
.subscribe(this::handleResponse, this::handleError));
您实际上使用的是几种重载的 Observable#subscribe
方法之一:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
还有一个也接受 Action
来执行 onComplete:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) {
另一个选项允许您简单地传入 Observer
(注意:void 方法) (编辑 2 - 此方法定义在ObservableSource
,这是Observable
扩展的接口。)
public final void subscribe(Observer<? super T> observer)
在您问题的第二个代码片段中,您使用了 subscribeWith
方法,该方法只是 return 您传入的 Observer
(对于 convenience/caching 等):
public final <E extends Observer<? super T>> E subscribeWith(E observer)
Observer#on完整解释:
Observer#onComplete 在 Observable 发出流中的所有项目后被调用。
来自 java 文档:
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
例如,如果您的代码片段中的 get()
return 编辑了一个 Observable
发出多个 News
对象,每个对象都将在 Observer#onNext
。在这里您可以处理每个项目。
在它们全部被处理后(假设没有错误发生),然后 onComplete
将被调用。在这里你可以执行你需要做的任何额外操作(例如更新 UI)知道你已经处理了所有 News
个对象。
这不要与 Disposable#dispose
混淆,后者在可观察流结束时被调用 (complete/error),或者由您手动终止观察(这是 CompositeDisposable
的出现是因为它可以帮助您一次性处理掉其中包含的所有 Disposable
。
如果在您的场景中 get()
将 return 一个仅发出单个项目的 Observable
,那么不要使用 Observable
,而是考虑使用 io.reactivex.Single
只处理一个项目(在 onSuccess
中),不需要为 onComplete 指定 Action
:)
编辑:回复您的评论:
However I still do not get use of subscribeWith, you said it passes
the observer for caching etc , where does it pass to? on complete? and
from what I understood subscribeWith is not actually consuming the
observable( or Single) right?
为了进一步阐明 subscribeWith
的解释,我的意思是它 将 消耗您传递给 [=25= 的 Observer
对象](与 subscribe
方法完全一样)但是它还会 return 同一个 Observer 直接返回给你。在撰写本文时,subscribeWith 的实现是:
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
因此,subscribeWith
可以与subscribe
互换使用。
Can you give a use case of subscribeWith with example? I guess that
will answer the question completely
subscribeWith
java文档给出了以下用法示例:
Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();
ResourceObserver<Integer> rs = new ResourceObserver<>() {
// ...
};
composite.add(source.subscribeWith(rs));
请参阅此处 subscribeWith
的用法将 return 实例化的相同 ResourceObserver
对象。这为执行订阅和在一行中将 ResourceObserver
添加到 CompositeDisposable
提供了便利(请注意 ResourceObservable
实现了 Disposable
。)
编辑 2 回复第二条评论。
source.subscribeWith(rs); source.subscribe(rs); both return
SingleObserver instance,
ObservableSource#subscribe(Observer <? super T> observer)
不 return Observer
。这是一个无效的方法(参见上面 Observable#subscribe 解释下的注释。)而 Observable#subscribeWith
DOES return Observer
。
如果我们要改用 ObservableSource#subscribe
重写示例使用代码,我们必须像这样分两行进行:
source.subscribe(rs); //ObservableSource#subscribe is a void method so nothing will be returned
composite.add(rs);
而 Observable#subscribeWith
方法使我们可以方便地在一行中完成上述操作 composite.add(source.subscribeWith(rs));
它可能会混淆所有看起来有些相似的重载订阅方法,但是存在差异(其中一些是细微的)。查看代码和文档有助于区分它们。
编辑 3 subscribeWith 的另一个示例用例
subscribeWith
方法在您具有可能想要重用的 Observer
的特定实现时很有用。例如,在上面的示例代码中,它在订阅中提供了 ResourceObserver
的特定实现,从而继承了它的功能,同时仍然允许您处理 onNext onError 和 onComplete.
另一个示例使用:对于您问题中的示例代码,如果您想在多个地方对 get()
响应执行相同的订阅怎么办?
与其在不同的 class 中复制 onNext 和 onError 的 Consumer
实现,不如为例如
定义一个新的 class。
//sample code..
public class GetNewsObserver extends DisposableObserver<News> {
//implement your onNext, onError, onComplete.
....
}
现在,无论何时执行 get()
请求,您都可以通过以下方式简单地订阅:
compositeDisposable.add(get()
...
.subscribeWith(new GetNewsObserver()));
看到代码现在很简单,您保持处理响应的职责分离,现在可以在任何需要的地方重用 GetNewsObserver
。
什么时候调用 subscribeWith 方法而不是简单的订阅?用例是什么?
compositeDisposable.add(get()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(this::handleResponse, this::handleError));
VS
compositeDisposable.add(get()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
// .subscribe(this::handleResponse, this::handleError);
.subscribeWith(new DisposableObserver<News>() {
@Override public void onNext(News value) {
handleResponse(value);
}
@Override public void onError(Throwable e) {
handleError(e);
}
@Override public void onComplete() {
// dispose here ? why? when the whole thing will get disposed later
//via compositeDisposable.dispose(); in onDestroy();
}
}));
谢谢
稍后添加
根据文档,return 个一次性 SingleObserver 实例:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
subscribe(s);
return s;
}
其中 ConsumerSingleObserver class 实现了 SingleObserver 和 Disposable。
Observable#订阅说明:
在您的第一个代码片段中:
.subscribe(this::handleResponse, this::handleError));
您实际上使用的是几种重载的 Observable#subscribe
方法之一:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
还有一个也接受 Action
来执行 onComplete:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) {
另一个选项允许您简单地传入 Observer
(注意:void 方法) (编辑 2 - 此方法定义在ObservableSource
,这是Observable
扩展的接口。)
public final void subscribe(Observer<? super T> observer)
在您问题的第二个代码片段中,您使用了 subscribeWith
方法,该方法只是 return 您传入的 Observer
(对于 convenience/caching 等):
public final <E extends Observer<? super T>> E subscribeWith(E observer)
Observer#on完整解释:
Observer#onComplete 在 Observable 发出流中的所有项目后被调用。 来自 java 文档:
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
例如,如果您的代码片段中的 get()
return 编辑了一个 Observable
发出多个 News
对象,每个对象都将在 Observer#onNext
。在这里您可以处理每个项目。
在它们全部被处理后(假设没有错误发生),然后 onComplete
将被调用。在这里你可以执行你需要做的任何额外操作(例如更新 UI)知道你已经处理了所有 News
个对象。
这不要与 Disposable#dispose
混淆,后者在可观察流结束时被调用 (complete/error),或者由您手动终止观察(这是 CompositeDisposable
的出现是因为它可以帮助您一次性处理掉其中包含的所有 Disposable
。
如果在您的场景中 get()
将 return 一个仅发出单个项目的 Observable
,那么不要使用 Observable
,而是考虑使用 io.reactivex.Single
只处理一个项目(在 onSuccess
中),不需要为 onComplete 指定 Action
:)
编辑:回复您的评论:
However I still do not get use of subscribeWith, you said it passes the observer for caching etc , where does it pass to? on complete? and from what I understood subscribeWith is not actually consuming the observable( or Single) right?
为了进一步阐明 subscribeWith
的解释,我的意思是它 将 消耗您传递给 [=25= 的 Observer
对象](与 subscribe
方法完全一样)但是它还会 return 同一个 Observer 直接返回给你。在撰写本文时,subscribeWith 的实现是:
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
因此,subscribeWith
可以与subscribe
互换使用。
Can you give a use case of subscribeWith with example? I guess that will answer the question completely
subscribeWith
java文档给出了以下用法示例:
Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();
ResourceObserver<Integer> rs = new ResourceObserver<>() {
// ...
};
composite.add(source.subscribeWith(rs));
请参阅此处 subscribeWith
的用法将 return 实例化的相同 ResourceObserver
对象。这为执行订阅和在一行中将 ResourceObserver
添加到 CompositeDisposable
提供了便利(请注意 ResourceObservable
实现了 Disposable
。)
编辑 2 回复第二条评论。
source.subscribeWith(rs); source.subscribe(rs); both return SingleObserver instance,
ObservableSource#subscribe(Observer <? super T> observer)
不 return Observer
。这是一个无效的方法(参见上面 Observable#subscribe 解释下的注释。)而 Observable#subscribeWith
DOES return Observer
。
如果我们要改用 ObservableSource#subscribe
重写示例使用代码,我们必须像这样分两行进行:
source.subscribe(rs); //ObservableSource#subscribe is a void method so nothing will be returned
composite.add(rs);
而 Observable#subscribeWith
方法使我们可以方便地在一行中完成上述操作 composite.add(source.subscribeWith(rs));
它可能会混淆所有看起来有些相似的重载订阅方法,但是存在差异(其中一些是细微的)。查看代码和文档有助于区分它们。
编辑 3 subscribeWith 的另一个示例用例
subscribeWith
方法在您具有可能想要重用的 Observer
的特定实现时很有用。例如,在上面的示例代码中,它在订阅中提供了 ResourceObserver
的特定实现,从而继承了它的功能,同时仍然允许您处理 onNext onError 和 onComplete.
另一个示例使用:对于您问题中的示例代码,如果您想在多个地方对 get()
响应执行相同的订阅怎么办?
与其在不同的 class 中复制 onNext 和 onError 的 Consumer
实现,不如为例如
//sample code..
public class GetNewsObserver extends DisposableObserver<News> {
//implement your onNext, onError, onComplete.
....
}
现在,无论何时执行 get()
请求,您都可以通过以下方式简单地订阅:
compositeDisposable.add(get()
...
.subscribeWith(new GetNewsObserver()));
看到代码现在很简单,您保持处理响应的职责分离,现在可以在任何需要的地方重用 GetNewsObserver
。