RxJava:链式异步回调式网络服务

RxJava: Chain asynchronous callback-style webservices

我有一些数据不断从硬件设备发出。如果有足够的数据到达,则需要首先将此数据连续发送到 Web 服务 A returns 一段时间后的结果流。然后每个结果必须在到达 web 服务 B 后立即转发,如果来自 A 的足够结果已经到达,则在一段时间后 returns 一个不同的结果流。每个网络服务都有一个异步回调样式 API。在第一次将数据发送到每个 Web 服务之前,还需要进行一些连接设置。

如何将其映射到 RxJava?

flatMapconcatMap 是异步链接的主要工具。

您需要将网络服务包装到 Future 中。并将您的硬件设备放入 Observable 源。然后就这么简单:

class WebServices {
    Future<Response1> callService1(parameters) { ... }
    Future<Response2> callService2(parameters) { ... }
}

hardwareSource
    .flatMap(v -> Observable.fromFuture(callService1(...)))
    .flatMap(r1 -> Observable.fromFuture(callService2(...)))
    .subscribe(r2 -> System.out.println(r2));

万一 web 服务接收和发送 系列 消息,它们应该被包装到 Observable 中。处理管道看起来像:

class WebServices {
    Observable<Response1> sendToService1(parameters) { ... }
    Observable<Response2> sendToService2(parameters) { ... }
}

hardwareSource
    .flatMap(v -> sendToService1(...))
    .flatMap(r1 -> sendToService2(...))
    .subscribe(r2 -> System.out.println(r2));

如果 Web 服务的传入和传出流不严格关联(响应与请求不直接关联),那么我会将这些服务实现为 类 公开 ObserverObservable 接口。

// wire them up
hardwareSource.getObservable()
    .subscribe(webService1.getObserver());
webService1.getObservable()
    .subscribe(webService2.getObserver());
webService2.getObservable()
    .subscribe(resultHandler);

// initiate connections
webService2.connect()
webService1.connect()
hardwareSource.connect()