RxJava 链式 Observable 和 NetworkMainThreadException
RxJava Chained Observables and NetworkMainThreadException
所以我有这个代码:
public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) {
return Observable.<AbstractXMPPConnection>create(subscriber -> {
try {
AbstractXMPPConnection connection2 = connection.connect();
if (connection2.isConnected()) {
subscriber.onNext(connection2);
subscriber.onCompleted();
}
} catch (SmackException | IOException | XMPPException e) {
e.printStackTrace();
subscriber.onError(e);
}
})
.doOnError(throwable -> LOGI("111", "Connection OnError called"));
}
public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) {
return connect(connection)
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer))
.flatMap(pair -> {
if (pair.second == MAX_LOGIN_TRIES)
return Observable.error(pair.first);
return Observable.timer(pair.second, TimeUnit.SECONDS);
}));
}
public void connect() {
assertTrue("To start a connection to the server, you must first call init() method!",
this.connectionConfig != null);
connectionHelper.connectWithRetry(connection)
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<AbstractXMPPConnection>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
LOGI(TAG, "ConnectionHelper Connection onError\n");
/**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */
MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent());
}
@Override
public void onNext(AbstractXMPPConnection connection) {
LOGI(TAG, "ConnectionHelper Connection onNext");
// onConnected();
}
});
}
我对链接可观察对象有一些疑问。想象一下这个场景,我有一个连接 Observable,有时我会使用它,但我主要使用 connectWithRetry()
Observable。
我的问题是,如果添加这个会发生什么:
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
connect()
和 connectWithRetry()
?在这种情况下,当我打电话
public void connect并指定一个scheduler,前面的都忽略了?
为什么我得到 NetworkOnMainThreadException
?明确的 observeOn(Schedulers.newThread())
在那里,它不应该给我那个错误
请尝试 Schedulers.io() 可能会解决问题。
我会先解决您的 NetworkOnMainThread
问题。
observeOn(Schedulers.newThread())
意味着 输出将在新线程上被观察到 - 也就是说,您的订阅者 (onComplete/Error/Next
) 中的代码将是 运行 在该线程上。
subscribeOn(AndroidSchedulers.mainThread()
意味着 subscription 将发生在主线程上 - 您创建的可观察对象(connection.connect()
等)中的代码就是 运行 订阅发生时。
所以简单地交换调度程序:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
因此,为了解决您的第一个问题,它们并没有被忽略,它们只是使用不当。希望从这里你可以看到如果你将类似的调用移动到 return observables 的方法中的链中会发生什么:与你已经完成的没有什么不同。这些电话只是在不同的地方。
那么调度程序选择放在哪里?这取决于你。您可以通过 not 在创建可观察对象的方法中调用 subscribeOn
来提高清晰度:
connectionHelper.connectWithRetry(connection)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
但是,如果您觉得无缘无故到处调用它,则可以将 subscribeOn
调用移到您的方法中:
return connect(connection)
.retryWhen(...)
.flatMap(...)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
请注意,这些不必像这样捆绑在一起 - 您可以 subscribeOn
在您的方法中,但将 observeOn
留给任何希望在特定线程上获得结果的调用者.
所以我有这个代码:
public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) {
return Observable.<AbstractXMPPConnection>create(subscriber -> {
try {
AbstractXMPPConnection connection2 = connection.connect();
if (connection2.isConnected()) {
subscriber.onNext(connection2);
subscriber.onCompleted();
}
} catch (SmackException | IOException | XMPPException e) {
e.printStackTrace();
subscriber.onError(e);
}
})
.doOnError(throwable -> LOGI("111", "Connection OnError called"));
}
public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) {
return connect(connection)
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer))
.flatMap(pair -> {
if (pair.second == MAX_LOGIN_TRIES)
return Observable.error(pair.first);
return Observable.timer(pair.second, TimeUnit.SECONDS);
}));
}
public void connect() {
assertTrue("To start a connection to the server, you must first call init() method!",
this.connectionConfig != null);
connectionHelper.connectWithRetry(connection)
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<AbstractXMPPConnection>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
LOGI(TAG, "ConnectionHelper Connection onError\n");
/**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */
MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent());
}
@Override
public void onNext(AbstractXMPPConnection connection) {
LOGI(TAG, "ConnectionHelper Connection onNext");
// onConnected();
}
});
}
我对链接可观察对象有一些疑问。想象一下这个场景,我有一个连接 Observable,有时我会使用它,但我主要使用 connectWithRetry()
Observable。
我的问题是,如果添加这个会发生什么:
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
connect()
和 connectWithRetry()
?在这种情况下,当我打电话
public void connect并指定一个scheduler,前面的都忽略了?
为什么我得到 NetworkOnMainThreadException
?明确的 observeOn(Schedulers.newThread())
在那里,它不应该给我那个错误
请尝试 Schedulers.io() 可能会解决问题。
我会先解决您的 NetworkOnMainThread
问题。
observeOn(Schedulers.newThread())
意味着 输出将在新线程上被观察到 - 也就是说,您的订阅者 (onComplete/Error/Next
) 中的代码将是 运行 在该线程上。
subscribeOn(AndroidSchedulers.mainThread()
意味着 subscription 将发生在主线程上 - 您创建的可观察对象(connection.connect()
等)中的代码就是 运行 订阅发生时。
所以简单地交换调度程序:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
因此,为了解决您的第一个问题,它们并没有被忽略,它们只是使用不当。希望从这里你可以看到如果你将类似的调用移动到 return observables 的方法中的链中会发生什么:与你已经完成的没有什么不同。这些电话只是在不同的地方。
那么调度程序选择放在哪里?这取决于你。您可以通过 not 在创建可观察对象的方法中调用 subscribeOn
来提高清晰度:
connectionHelper.connectWithRetry(connection)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
但是,如果您觉得无缘无故到处调用它,则可以将 subscribeOn
调用移到您的方法中:
return connect(connection)
.retryWhen(...)
.flatMap(...)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
请注意,这些不必像这样捆绑在一起 - 您可以 subscribeOn
在您的方法中,但将 observeOn
留给任何希望在特定线程上获得结果的调用者.