Akka Streams:初始化和处置接收器资源的最佳实践
Akka Streams: Best practice to initialise and dispose resources of a sink
我有一个场景,我想将图表的结果写入 CSV。
这包括文件的创建、文件编写器的初始化 (I'm using this library),最后,在流完成后,我想再次 dispose/close 编写器。
理想情况下,我想将此逻辑封装在接收器中,但我想知道添加初始化和处置逻辑的最佳实践/挂钩。
要使用 Akka Streams 将 CSV 内容写入文件,请使用 Alpakka's CSV connector and the FileIO
实用程序。这是一个简单的例子:
val destinationPath: Path = ???
Source.single(ByteString("""header1,header2,header3
|1,2,3
|4,5,6""".stripMargin))
.via(CsvParsing.lineScanner())
.runWith(FileIO.toPath(destinationPath))
请注意,FileIO.toPath
可选择采用 OpenOption
设置。例如,如果目标文件不存在,您可以规定创建该文件:
// ...
.runWith(
FileIO.toPath(destinationPath, Set(StandardOpenOption.WRITE, StandardOpenOption.CREATE)))
关于文件相关底层资源的清理,如果你使用FileIO
,Akka Streams会在流完成时进行清理。
给定任何类型的资源,而不仅仅是一个文件,它消耗数据元素并且 needs to be closed:
type Data = ???
trait DataConsumer extends Function1[Data, Unit] with AutoCloseable
可以使用 Flow
的 watchTermination
方法创建一个 Sink
并在完成后关闭消费者,该方法可以在前面加上:
def createDataConsumerSink(dataConsumer: DataConsumer) : Sink[Data,_] =
Flow[Data].watchTermination()( (_, f) => f foreach (_ => dataConsumer.close()))
.to(Sink.foreach[Data](dataConsumer.apply))
我有一个场景,我想将图表的结果写入 CSV。 这包括文件的创建、文件编写器的初始化 (I'm using this library),最后,在流完成后,我想再次 dispose/close 编写器。
理想情况下,我想将此逻辑封装在接收器中,但我想知道添加初始化和处置逻辑的最佳实践/挂钩。
要使用 Akka Streams 将 CSV 内容写入文件,请使用 Alpakka's CSV connector and the FileIO
实用程序。这是一个简单的例子:
val destinationPath: Path = ???
Source.single(ByteString("""header1,header2,header3
|1,2,3
|4,5,6""".stripMargin))
.via(CsvParsing.lineScanner())
.runWith(FileIO.toPath(destinationPath))
请注意,FileIO.toPath
可选择采用 OpenOption
设置。例如,如果目标文件不存在,您可以规定创建该文件:
// ...
.runWith(
FileIO.toPath(destinationPath, Set(StandardOpenOption.WRITE, StandardOpenOption.CREATE)))
关于文件相关底层资源的清理,如果你使用FileIO
,Akka Streams会在流完成时进行清理。
给定任何类型的资源,而不仅仅是一个文件,它消耗数据元素并且 needs to be closed:
type Data = ???
trait DataConsumer extends Function1[Data, Unit] with AutoCloseable
可以使用 Flow
的 watchTermination
方法创建一个 Sink
并在完成后关闭消费者,该方法可以在前面加上:
def createDataConsumerSink(dataConsumer: DataConsumer) : Sink[Data,_] =
Flow[Data].watchTermination()( (_, f) => f foreach (_ => dataConsumer.close()))
.to(Sink.foreach[Data](dataConsumer.apply))