RxJava 等待单个事件,RxJava

RxJava wait for single Event, RxJava

您如何等待单个值超时出现在 observable 上?

我正在寻找类似的东西:

Observable<Acknowledgement> acknowledgementObservable;
port.send(new Message());
Optional<Acknowledgement> ack = acknowledgementObservable.getFirst(100, TimeUnit.MILLISECONDS);

首先,将 Observable 转换为 CompletableFuture,如 Converting between Completablefuture and Observable:

中所述
Observable<Acknowledgement> acknowledgementObservable;
port.send(new Message());
CompletableFuture<T> future = new CompletableFuture<>();
acknowledgementObservable
    .doOnError(future::completeExceptionally)
    .single()
    .forEach(future::complete);

然后,使用超时等待事件:

Acknowledgement ack = future.get(100, TimeUnit.MILLISECONDS);

如果超时则抛出TimeoutException

您可以通过添加 .timeout() 和 .onErrorComplete() .blockingGet()

所以在这个例子中它将是:

    Acknowledgement ack = acknowledgementObservable
                              .firstElement()
                              .timeout(3, TimeUnit.SECONDS)
                              .onErrorComplete()
                              .blockingGet();

如果超时,ack 将为空。