在 ConnectableObservable 中执行异步查询的线程问题
Thread problem executing async query inside ConnectableObservable
所以我有一个接收字符串的 ConnectableObservable,我需要为每个字符串执行查询(异步)并等待结果返回
目前我正在使用闩锁等待查询返回,但可观察到它卡住了或者只是完成了第一个字符串而没有继续其他字符串。
我已经隔离了这个例子中的问题
private Subscriber<? super String> stringStreamInput;
ConnectableObservable<String> stringStream = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
stringStreamInput = subscriber;
}
})
.observeOn(Schedulers.io())
.map(new Func1<String, String>() {
@Override
public String call(String perro) {
return getPerroDetails(perro);
}
})
.publish();
stringStream.connect();
调用查询模拟的方法
private String getPerroDetails(final String perro) {
Log.d("Hey", "Perro " + perro);
//Simulating the query to get Perro details
String newPerro = executeQueryForPerro(perro);
Log.d("Hey", "NewPerro " + newPerro);
return newPerro;
}
使用 CountDownLatch 模拟查询
private String executeQueryForPerro(final String perro) {
final String[] newPerro = new String[1];
final CountDownLatch latch = new CountDownLatch(1);
//SIMULATION OF THE QUERY
Handler handler = new Handler();
handler.postDelayed(new Runnable() {
@Override
public void run() {
newPerro[0] = perro + " reloaded ";
latch.countdown();
}
}, 1000);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return newPerro[0];
}
测试
String[] perros = new String[]{"Beagle", "Alaska", "Chihuahua", "PitBull", "RedBull"};
for (String perro : perros) {
stringStreamInput.onNext(perro);
}
我可以采取哪些其他方法来确保 Query 正确执行并等待结果返回以转到下一步?
有什么神奇的 Rx 解决方案吗?
非常感谢
当你将 RxJava 的世界与显式线程管理混合使用时,你会经常 运行 遇到问题,通常是在测试时。 RxJava 工具包非常灵活,几乎可以提供您需要的一切。
您的数据源需要类似于 Subject
。在您的代码中,您直接向订阅者发送值,完全绕过您的观察者链,这可能不是您想要的。
PublishSubject<String> stringStreamInput = PublishSubject.create();
然后,将 getPerroDetails()
更改为 return 和最终将提供结果的 Observable
:
Observable<String> getPerroDetails(final String perro);
然后可以使用flatMap()
运算符获取详细信息:
Observable<String> stringStream = stringStreamInput
.observeOn(Schedulers.io())
.flatMap( input -> getPerroDetails(input) )
.publish();
stringStream.connect();
getPerroDetails()
是如何实现的?您使用 RxJava 运算符之一来调用异步方法:
Observable<String> getPerroDetails(String perro)
{
return Observable
.just( perro + " reloaded" )
.delay(1000, TimeUnit.MILLISECONDS));
}
所以我有一个接收字符串的 ConnectableObservable,我需要为每个字符串执行查询(异步)并等待结果返回
目前我正在使用闩锁等待查询返回,但可观察到它卡住了或者只是完成了第一个字符串而没有继续其他字符串。
我已经隔离了这个例子中的问题
private Subscriber<? super String> stringStreamInput;
ConnectableObservable<String> stringStream = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
stringStreamInput = subscriber;
}
})
.observeOn(Schedulers.io())
.map(new Func1<String, String>() {
@Override
public String call(String perro) {
return getPerroDetails(perro);
}
})
.publish();
stringStream.connect();
调用查询模拟的方法
private String getPerroDetails(final String perro) {
Log.d("Hey", "Perro " + perro);
//Simulating the query to get Perro details
String newPerro = executeQueryForPerro(perro);
Log.d("Hey", "NewPerro " + newPerro);
return newPerro;
}
使用 CountDownLatch 模拟查询
private String executeQueryForPerro(final String perro) {
final String[] newPerro = new String[1];
final CountDownLatch latch = new CountDownLatch(1);
//SIMULATION OF THE QUERY
Handler handler = new Handler();
handler.postDelayed(new Runnable() {
@Override
public void run() {
newPerro[0] = perro + " reloaded ";
latch.countdown();
}
}, 1000);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return newPerro[0];
}
测试
String[] perros = new String[]{"Beagle", "Alaska", "Chihuahua", "PitBull", "RedBull"};
for (String perro : perros) {
stringStreamInput.onNext(perro);
}
我可以采取哪些其他方法来确保 Query 正确执行并等待结果返回以转到下一步?
有什么神奇的 Rx 解决方案吗?
非常感谢
当你将 RxJava 的世界与显式线程管理混合使用时,你会经常 运行 遇到问题,通常是在测试时。 RxJava 工具包非常灵活,几乎可以提供您需要的一切。
您的数据源需要类似于 Subject
。在您的代码中,您直接向订阅者发送值,完全绕过您的观察者链,这可能不是您想要的。
PublishSubject<String> stringStreamInput = PublishSubject.create();
然后,将 getPerroDetails()
更改为 return 和最终将提供结果的 Observable
:
Observable<String> getPerroDetails(final String perro);
然后可以使用flatMap()
运算符获取详细信息:
Observable<String> stringStream = stringStreamInput
.observeOn(Schedulers.io())
.flatMap( input -> getPerroDetails(input) )
.publish();
stringStream.connect();
getPerroDetails()
是如何实现的?您使用 RxJava 运算符之一来调用异步方法:
Observable<String> getPerroDetails(String perro)
{
return Observable
.just( perro + " reloaded" )
.delay(1000, TimeUnit.MILLISECONDS));
}