RxJava 运算符像 amb,但只有有效结果
RxJava operator like amb, but only with valid results
我想在 Android
应用程序中自动查找设备。因此,我想进行两次调用,一次使用 Retrofit
进行网络调用,一次使用自定义 SDK 进行非网络调用,同时找出用户正在使用的设备。应用应选择第一个提供有效值的结果。
我使用 RxJava
并像这样使用运算符 amb
进行了尝试:
public Observable<LoginResponse> detectDevice(String username, String pwd) {
return Observable.amb(device1.login(username, pwd), device2.login(username, pwd));
}
如果需要检测的设备是使用网络调用的设备1,这似乎可以正常工作。但是如果应该检测到的是device2,它会return onError()
,因为device1.login()
完成得更快,而amb
占据第一个onNext()
或[=15] =].即使 device2.login() 提供了有效结果,也不会被考虑在内,因为它太慢了。
我的问题是:有没有更好的方法只接受有效响应或其他操作员?我不想使用zip
,因为将来可能会有更多的设备,我不想让用户等到每个设备的登录请求都完成。
你可以试试
Observable.mergeDelayError(device1.login(username, pwd), device2.login(username, pwd)).first()
您可以尝试使用 materialise
operator on any ouput from login
function and see if it is error, then use takeUntil
运算符来静默丢弃任何错误:
List<Observable<LoginResponse>> logins = new ArrayList<>();
logins.add(device1.login(username, pwd));
logins.add(device2.login(username, pwd));
Observable.from(logins)
.materialize()
.takeUntil((observableNotification) -> {
return !observableNotification.isOnError();
}).dematerialize();
如果任何 login
函数都没有响应将 Throwable
抛出到 Subscriber
,则可以添加 timeout
。
JohnWowUs post 启发了我使用 materialize
,但有点不同,这就是我使用的:
public Observable<LoginResponse> detectDevices(String username, String password) {
Observable<Notification<LoginResponse>> deviceOneObservable = device1.login(username, password).timeout(2, TimeUnit.SECDONDS).materialize().take(1);
Observable<Notification<LoginResponse>> deviceTwoObservable = device2.login(username, password).timeout(2, TimeUnit.SECONDS).materialize().take(1);
return Observable
.zip(deviceOneObservable, deviceTwoObservable, new Func2<Notification<LoginResponse>, Notification<LoginResponse>, Pair<Notification<LoginResponse>, Notification<LoginResponse>>>() {
@Override
public Pair<Notification<LoginResponse>, Notification<LoginResponse>> call(Notification<LoginResponse> loginResponseNotification, Notification<LoginResponse> loginResponseNotification2) {
return Pair.create(loginResponseNotification, loginResponseNotification2);
}
})
.flatMap(new Func1<Pair<Notification<LoginResponse>, Notification<LoginResponse>>, Observable<LoginResponse>>() {
@Override
public Observable<LoginResponse> call(Pair<Notification<LoginResponse>, Notification<LoginResponse>> notificationNotificationPair) {
final Notification<LoginResponse> deviceOneNotification = notificationNotificationPair.first;
final Notification<LoginResponse> deviceTwoNotification = notificationNotificationPair.second;
//treat 4 different cases of device detection
//case1: no compatible device was detected
if (deviceOneNotification.isOnError() && deviceTwoNotification.isOnError()) {
return Observable.just(new LoginResponse(DeviceType.UNKNOWN));
//case2: device1 was detected
} else if (deviceOneNotification.isOnNext()) {
return Observable.just(new LoginResponse(DeviceType.DEVICE_ONE));
//case3: device2 was detected
} else if (deviceTwoNotification.isOnNext()) {
return Observable.just(new LoginResponse(DeviceType.DEVICE_TWO));
//case4: error has occurred
} else {
... //error handling
}
}
}
}
我想在 Android
应用程序中自动查找设备。因此,我想进行两次调用,一次使用 Retrofit
进行网络调用,一次使用自定义 SDK 进行非网络调用,同时找出用户正在使用的设备。应用应选择第一个提供有效值的结果。
我使用 RxJava
并像这样使用运算符 amb
进行了尝试:
public Observable<LoginResponse> detectDevice(String username, String pwd) {
return Observable.amb(device1.login(username, pwd), device2.login(username, pwd));
}
如果需要检测的设备是使用网络调用的设备1,这似乎可以正常工作。但是如果应该检测到的是device2,它会return onError()
,因为device1.login()
完成得更快,而amb
占据第一个onNext()
或[=15] =].即使 device2.login() 提供了有效结果,也不会被考虑在内,因为它太慢了。
我的问题是:有没有更好的方法只接受有效响应或其他操作员?我不想使用zip
,因为将来可能会有更多的设备,我不想让用户等到每个设备的登录请求都完成。
你可以试试
Observable.mergeDelayError(device1.login(username, pwd), device2.login(username, pwd)).first()
您可以尝试使用 materialise
operator on any ouput from login
function and see if it is error, then use takeUntil
运算符来静默丢弃任何错误:
List<Observable<LoginResponse>> logins = new ArrayList<>();
logins.add(device1.login(username, pwd));
logins.add(device2.login(username, pwd));
Observable.from(logins)
.materialize()
.takeUntil((observableNotification) -> {
return !observableNotification.isOnError();
}).dematerialize();
如果任何 login
函数都没有响应将 Throwable
抛出到 Subscriber
,则可以添加 timeout
。
JohnWowUs post 启发了我使用 materialize
,但有点不同,这就是我使用的:
public Observable<LoginResponse> detectDevices(String username, String password) {
Observable<Notification<LoginResponse>> deviceOneObservable = device1.login(username, password).timeout(2, TimeUnit.SECDONDS).materialize().take(1);
Observable<Notification<LoginResponse>> deviceTwoObservable = device2.login(username, password).timeout(2, TimeUnit.SECONDS).materialize().take(1);
return Observable
.zip(deviceOneObservable, deviceTwoObservable, new Func2<Notification<LoginResponse>, Notification<LoginResponse>, Pair<Notification<LoginResponse>, Notification<LoginResponse>>>() {
@Override
public Pair<Notification<LoginResponse>, Notification<LoginResponse>> call(Notification<LoginResponse> loginResponseNotification, Notification<LoginResponse> loginResponseNotification2) {
return Pair.create(loginResponseNotification, loginResponseNotification2);
}
})
.flatMap(new Func1<Pair<Notification<LoginResponse>, Notification<LoginResponse>>, Observable<LoginResponse>>() {
@Override
public Observable<LoginResponse> call(Pair<Notification<LoginResponse>, Notification<LoginResponse>> notificationNotificationPair) {
final Notification<LoginResponse> deviceOneNotification = notificationNotificationPair.first;
final Notification<LoginResponse> deviceTwoNotification = notificationNotificationPair.second;
//treat 4 different cases of device detection
//case1: no compatible device was detected
if (deviceOneNotification.isOnError() && deviceTwoNotification.isOnError()) {
return Observable.just(new LoginResponse(DeviceType.UNKNOWN));
//case2: device1 was detected
} else if (deviceOneNotification.isOnNext()) {
return Observable.just(new LoginResponse(DeviceType.DEVICE_ONE));
//case3: device2 was detected
} else if (deviceTwoNotification.isOnNext()) {
return Observable.just(new LoginResponse(DeviceType.DEVICE_TWO));
//case4: error has occurred
} else {
... //error handling
}
}
}
}