RxJava 等待单个事件,RxJava
RxJava wait for single Event, RxJava
您如何等待单个值超时出现在 observable 上?
我正在寻找类似的东西:
Observable<Acknowledgement> acknowledgementObservable;
port.send(new Message());
Optional<Acknowledgement> ack = acknowledgementObservable.getFirst(100, TimeUnit.MILLISECONDS);
首先,将 Observable
转换为 CompletableFuture
,如 Converting between Completablefuture and Observable:
中所述
Observable<Acknowledgement> acknowledgementObservable;
port.send(new Message());
CompletableFuture<T> future = new CompletableFuture<>();
acknowledgementObservable
.doOnError(future::completeExceptionally)
.single()
.forEach(future::complete);
然后,使用超时等待事件:
Acknowledgement ack = future.get(100, TimeUnit.MILLISECONDS);
如果超时则抛出TimeoutException
。
您可以通过添加 .timeout() 和 .onErrorComplete() .blockingGet()
所以在这个例子中它将是:
Acknowledgement ack = acknowledgementObservable
.firstElement()
.timeout(3, TimeUnit.SECONDS)
.onErrorComplete()
.blockingGet();
如果超时,ack 将为空。
您如何等待单个值超时出现在 observable 上?
我正在寻找类似的东西:
Observable<Acknowledgement> acknowledgementObservable;
port.send(new Message());
Optional<Acknowledgement> ack = acknowledgementObservable.getFirst(100, TimeUnit.MILLISECONDS);
首先,将 Observable
转换为 CompletableFuture
,如 Converting between Completablefuture and Observable:
Observable<Acknowledgement> acknowledgementObservable;
port.send(new Message());
CompletableFuture<T> future = new CompletableFuture<>();
acknowledgementObservable
.doOnError(future::completeExceptionally)
.single()
.forEach(future::complete);
然后,使用超时等待事件:
Acknowledgement ack = future.get(100, TimeUnit.MILLISECONDS);
如果超时则抛出TimeoutException
。
您可以通过添加 .timeout() 和 .onErrorComplete() .blockingGet()
所以在这个例子中它将是:
Acknowledgement ack = acknowledgementObservable
.firstElement()
.timeout(3, TimeUnit.SECONDS)
.onErrorComplete()
.blockingGet();
如果超时,ack 将为空。