Akka 流文件处理和终止

Akka Streams File Handling and Termination

我有以下读取 CSV 文件并向控制台打印内容的片段:

def readUsingAkkaStreams = {

    import java.io.File
    import akka.stream.scaladsl._
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import java.security.MessageDigest


    implicit val system = ActorSystem("Sys")
    implicit val materializer = ActorMaterializer()

    val file = new File("/path/to/csv/file.csv")

    val fileSource = FileIO.fromFile(file, 65536)

    val flow = fileSource.map(chunk => chunk.utf8String)

    flow.to(Sink.foreach(println(_))).run
  }

我现在对此有一些疑问:

  1. chunksize 是以字节为单位的大小。内部如何处理?我的意思是我最终会遇到一个块可能只包含一行中的部分元素的情况吗?

  2. 这个流如何终止?现在它没有!我想让它知道它已经完全读取了文件并且应该触发停止信号!有没有一种机制可以做到这一点?

编辑 1: 根据下面 post 的建议,我收到一条错误消息,如屏幕截图所示!

编辑 2:

通过将 maximumFrameLength 设置为与最大块大小 65536 的大小相匹配,设法消除了错误。

val file = new File("/path/to/csf/file.csv")
val chunkSize = 65536
val fileSource = FileIO.fromFile(file, chunkSize).via(Framing.delimiter(
  ByteString("\n"),
  maximumFrameLength = chunkSize,
  allowTruncation = true))

1.As 根据 docs:

发出的元素是 chunkSize 大小的 ByteString 元素,最后一个元素除外,它的大小将达到 chunkSize。

FileIO 源将新行视为任何其他字符。所以是的,您可能会在一个块中看到 CSV 行的第一部分,在另一个块中看到第二部分。如果这不是您想要的,您可以使用 Framing.delimiter 重构 ByteString 流程的分块方式(有关详细信息,请参阅 docs)。

附带说明,FileIO.fromFile 已被弃用,最好使用 FileIO.fromPath

例如:

val fileSource = FileIO.fromPath(...)
  .via(Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true))

2.the 接收器具体化为 Future 您可以映射到流终止时执行某些操作:

val result: Future[IOResult] = flow.runWith(Sink.foreach(println(_)))

result.onComplete(...)