将 Source[ByteString, Any] 转换为 Source[ByteString, IOResult]

Convert Source[ByteString, Any] to Source[ByteString, IOResult]

我有一个:

val fileStream: Source[ByteString, Any] = Source.single(ByteString.fromString("Hello"))

这个Source[ByteString, Any]类型来自akka fileUpload指令:

https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/file-upload-directives/fileUpload.html

我可以将其转换为 Source[ByteString, IOResult],或者执行一些类似于 Source.single(ByteString.fromString("Hello")) 的其他操作,从而 return 从字符串中获取 Source[ByteString, IOResult] 吗?

我可以创建一个 IO 结果:

val ioResult: IOResult = IOResult.createSuccessful(1L)

和一个 ByteString:

val byteString: ByteString = ByteString.fromString("Hello")

所以现在我只需要它们作为 Source[ByteString, IOResult]

注意,这只是一个单元测试,我正在测试一个 return 是 Source[ByteString, IOResult] 的函数,所以我想创建一个实例(而不必创建一个文件)来断言函数是 returning 正确的 ByteString,我真的不关心 Source.

IOResult 部分

具体到文件上传指令

我怀疑 fileUpload 作者将物化类型保留为 Any 以允许将来更改 API。通过将其保留为 Any,他们以后可以将其更改为他们真正想要确定的类型。

因此,即使您能够转换为 IOResult 如果您升级 akka 版本并且类型已更改,您也可能会在以后遇到问题...

物化一般

Source 类型参数中的第二个类型表示流将"materialize" 成什么。使用您的示例代码,并将 Any 修改为 the actual type NotUsed,我们可以显示流的整个生命周期:

val notUsed : NotUsed = Source.single(ByteString.fromString("Hello"))
                              .toMat(Sink.ignore)(Keep.left)
                              .run()

如您所见,当流 运行 时,它会变成实际值(即物化)。在上面的例子中,值的类型是 NotUsed。这是因为您无法对源自单个值的流执行太多操作。

将该流与 a stream that operates on a file 进行对比:

val file = Paths.get("example.csv")

val fileIOResult: Future[IOResult] = FileIO.fromPath(file)
                                           .to(Sink.ignore)
                                           .run()

在这种情况下,流正在读取文件的内容并将其流式传输到 Sink。在这里了解文件读取是否有任何错误会很有用。为了能够找出文件读取的进展情况,流被具体化为 Future[IOResult],您可以使用它来获取有关文件读取的信息:

fileIOResult foreach { ioResult =>
  println(s"read ${ioResult.count} bytes from file")
}

因此,"convert" 一个 Any 变成一个 IOResult...

真的没有意义