Akka - 从客户端到节点分块共享大文件
Akka - Share a large file in chunks from client to nodes
我有比较多的 Akka 节点,用 Scala 编码,在不同的集群中,远程通信,最初使用的是一个中央 ClusterClient。
我的目标是通过为每个节点提供文件的一个子集(而不是整个文件),将一个大文件(多个 GB)从客户端的文件系统共享到所有不同的节点。
为此使用什么正确的 Akka 概念? Akka 的分布式数据明确指出它不适用于大数据,而且它似乎将整个文件复制到节点而不仅仅是一个块。看起来 Akka 的 Streams IO 是可行的方法,但我在网上找不到任何实现此目标的示例。
谢谢!
可以使用 streaming FileIO
功能读取 "large file":
val file = Paths.get("example.csv")
val fileSource : Source[ByteString, Future[IOResult]] = FileIO.fromPath(file)
然后可以解析此源以创建 "subsets of the file"。这个问题并不具体说明如何创建子集,因此假设我们要向每个节点发送 256
行。 Framing
can be used to do the parsing and grouped
将创建块:
val separator = ByteString("\n")
val maxLineLength = 1024
val chunkSize = 256
val lineParser : Flow[ByteString, ByteString, _] =
Framing
.delimiter(separator, maximumFrameLength = maxLineLength, allowTruncation= false)
.grouped(chunkSize)
然后可以将这些块分派到远程节点。问题没有具体说明如何与节点通信,因此使用了存根函数:
val sendChunk : Seq[ByteString] => Unit = ???
val chunkSink : Sink[Seq[ByteString], _] =
Sink[Seq[ByteString]].foreach(sendChunk)
fileSource
.via(lineParser)
.to(chunkSink)
.run()
我有比较多的 Akka 节点,用 Scala 编码,在不同的集群中,远程通信,最初使用的是一个中央 ClusterClient。 我的目标是通过为每个节点提供文件的一个子集(而不是整个文件),将一个大文件(多个 GB)从客户端的文件系统共享到所有不同的节点。
为此使用什么正确的 Akka 概念? Akka 的分布式数据明确指出它不适用于大数据,而且它似乎将整个文件复制到节点而不仅仅是一个块。看起来 Akka 的 Streams IO 是可行的方法,但我在网上找不到任何实现此目标的示例。
谢谢!
可以使用 streaming FileIO
功能读取 "large file":
val file = Paths.get("example.csv")
val fileSource : Source[ByteString, Future[IOResult]] = FileIO.fromPath(file)
然后可以解析此源以创建 "subsets of the file"。这个问题并不具体说明如何创建子集,因此假设我们要向每个节点发送 256
行。 Framing
can be used to do the parsing and grouped
将创建块:
val separator = ByteString("\n")
val maxLineLength = 1024
val chunkSize = 256
val lineParser : Flow[ByteString, ByteString, _] =
Framing
.delimiter(separator, maximumFrameLength = maxLineLength, allowTruncation= false)
.grouped(chunkSize)
然后可以将这些块分派到远程节点。问题没有具体说明如何与节点通信,因此使用了存根函数:
val sendChunk : Seq[ByteString] => Unit = ???
val chunkSink : Sink[Seq[ByteString], _] =
Sink[Seq[ByteString]].foreach(sendChunk)
fileSource
.via(lineParser)
.to(chunkSink)
.run()