Akka Streams:初始化和处置接收器资源的最佳实践

Akka Streams: Best practice to initialise and dispose resources of a sink

我有一个场景,我想将图表的结果写入 CSV。 这包括文件的创建、文件编写器的初始化 (I'm using this library),最后,在流完成后,我想再次 dispose/close 编写器。

理想情况下,我想将此逻辑封装在接收器中,但我想知道添加初始化和处置逻辑的最佳实践/挂钩。

要使用 Akka Streams 将 CSV 内容写入文件,请使用 Alpakka's CSV connector and the FileIO 实用程序。这是一个简单的例子:

val destinationPath: Path = ???

Source.single(ByteString("""header1,header2,header3
                           |1,2,3
                           |4,5,6""".stripMargin))
  .via(CsvParsing.lineScanner())
  .runWith(FileIO.toPath(destinationPath))

请注意,FileIO.toPath 可选择采用 OpenOption 设置。例如,如果目标文件不存在,您可以规定创建该文件:

// ...
.runWith(
  FileIO.toPath(destinationPath, Set(StandardOpenOption.WRITE, StandardOpenOption.CREATE)))

关于文件相关底层资源的清理,如果你使用FileIO,Akka Streams会在流完成时进行清理。

给定任何类型的资源,而不仅仅是一个文件,它消耗数据元素并且 needs to be closed:

type Data = ???

trait DataConsumer extends Function1[Data, Unit] with AutoCloseable

可以使用 FlowwatchTermination 方法创建一个 Sink 并在完成后关闭消费者,该方法可以在前面加上:

def createDataConsumerSink(dataConsumer: DataConsumer) : Sink[Data,_] = 
  Flow[Data].watchTermination()( (_, f) => f foreach (_ => dataConsumer.close()))
            .to(Sink.foreach[Data](dataConsumer.apply))