将 Akka 源代码转换为 RxJava2 Flowable?
Converting Akka Sources to RxJava2 Flowable?
我目前正在使用以下代码将 Akka 源(例如从使用 Akka 的 FileIO 读取文件接收到的)转换为 RxJava2 Flowable:
private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
Flow<ByteString, ByteString, NotUsed> compType) {
final Publisher<ByteString> uncompressedData =
data.via(compType)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer);
return Flowable.fromPublisher(uncompressedData)
.map(bytes -> Buffer.buffer(bytes.toArray()));
}
我对这个(可行的)解决方案的问题是,至少就我目前的理解而言,.runWith()
方法调用已经 运行 代码,即收集所有数据从给定的 Source,缓冲它然后将它放入 Publisher。此时有什么办法可以 运行 它吗?我只想在此时定义转换而无需实体化器,并且只有 运行 稍后订阅 Flowable 后的所有内容。
谢谢!
使用 defer(旁注:我不得不多次这样做,因为 Akka Sources 是一次性的):
private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
Flow<ByteString, ByteString, NotUsed> compType) {
return Flowable.defer(() -> data.via(compType)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer)
).map(bytes -> Buffer.buffer(bytes.toArray()));
}
我目前正在使用以下代码将 Akka 源(例如从使用 Akka 的 FileIO 读取文件接收到的)转换为 RxJava2 Flowable:
private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
Flow<ByteString, ByteString, NotUsed> compType) {
final Publisher<ByteString> uncompressedData =
data.via(compType)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer);
return Flowable.fromPublisher(uncompressedData)
.map(bytes -> Buffer.buffer(bytes.toArray()));
}
我对这个(可行的)解决方案的问题是,至少就我目前的理解而言,.runWith()
方法调用已经 运行 代码,即收集所有数据从给定的 Source,缓冲它然后将它放入 Publisher。此时有什么办法可以 运行 它吗?我只想在此时定义转换而无需实体化器,并且只有 运行 稍后订阅 Flowable 后的所有内容。
谢谢!
使用 defer(旁注:我不得不多次这样做,因为 Akka Sources 是一次性的):
private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
Flow<ByteString, ByteString, NotUsed> compType) {
return Flowable.defer(() -> data.via(compType)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer)
).map(bytes -> Buffer.buffer(bytes.toArray()));
}