将 Akka 源代码转换为 RxJava2 Flowable?

Converting Akka Sources to RxJava2 Flowable?

我目前正在使用以下代码将 Akka 源(例如从使用 Akka 的 FileIO 读取文件接收到的)转换为 RxJava2 Flowable:

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
        Flow<ByteString, ByteString, NotUsed> compType) {
    final Publisher<ByteString> uncompressedData =
         data.via(compType)
             .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer);
    return Flowable.fromPublisher(uncompressedData)
       .map(bytes -> Buffer.buffer(bytes.toArray()));
}

我对这个(可行的)解决方案的问题是,至少就我目前的理解而言,.runWith() 方法调用已经 运行 代码,即收集所有数据从给定的 Source,缓冲它然后将它放入 Publisher。此时有什么办法可以 运行 它吗?我只想在此时定义转换而无需实体化器,并且只有 运行 稍后订阅 Flowable 后的所有内容。

谢谢!

使用 defer(旁注:我不得不多次这样做,因为 Akka Sources 是一次性的):

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
        Flow<ByteString, ByteString, NotUsed> compType) {

    return Flowable.defer(() -> data.via(compType)
         .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer) 
    ).map(bytes -> Buffer.buffer(bytes.toArray()));
}