如何从 akka Source[ByteString, _] 中提取 Future[String]?
How do I extract a Future[String] from an akka Source[ByteString, _]?
我正在尝试使用 akka 流对文件进行流式传输,运行 遇到一个小问题,将流的结果提取到 Future[String]:
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
var fileString = ""
val sink = Sink.foreach[ByteString](byteString => fileString =
fileString.concat(byteString.decodeString("US-ASCII")))
source.runWith(sink)
}
我遇到编译错误:
Expression of type Future[Done] does not conform to expected type Future[String]
任何人都可以帮助我了解我做错了什么以及我需要做什么来提取流的结果吗?
如果我猜对了,您想将整个文件内容流式传输到一个字符串中。这最好用 Sink.fold
来实现,而不是 Sink.foreach
。示例如下。
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
val sink = Sink.fold[String, ByteString]("") { case (acc, str) =>
acc + str.decodeString("US-ASCII")
}
source.runWith(sink)
}
您可能已经意识到这一点,但您的文件需要适合内存才能让您的程序正确运行。
如果您查看 Sink.foreach
的定义,您会发现评估类型是 Sink[T, Future[Done]]
这意味着元素的计算结果会发生什么并不重要流。以下是定义:
def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
另一方面,Sink.fold
的定义计算出 Future[U]
是 U
zero
的类型。换句话说,您可以定义处理结束时未来的类型。
以下是Sink.fold
的定义(和实现):
def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
根据上面的实现,您可以看到要在具体化中保留的类型是 Future[U]
,因为 Keep.right
的含义类似于:"I don't care if the elements coming in are T
s (or ByteString
in your case) I (the stream) will give you U
s (or String
in your case) .. when I'm done (in a Future
)"
以下是您的案例的工作示例,将 Sink.foreach
替换为 Sink.fold
并将整个表达式计算为 Future[String]
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
var fileString = ""
//def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
val sinkForEach: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString](byteString => fileString =
fileString.concat(byteString.decodeString("US-ASCII")))
/*
def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
*/
val sinkFold: Sink[ByteString, Future[String]] = Sink.fold("") { case (acc, str) =>
acc + str
}
val res1: Future[Done] = source.runWith(sinkForEach)
val res2: Future[String] = source.runWith(sinkFold)
res2
}
我正在尝试使用 akka 流对文件进行流式传输,运行 遇到一个小问题,将流的结果提取到 Future[String]:
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
var fileString = ""
val sink = Sink.foreach[ByteString](byteString => fileString =
fileString.concat(byteString.decodeString("US-ASCII")))
source.runWith(sink)
}
我遇到编译错误:
Expression of type Future[Done] does not conform to expected type Future[String]
任何人都可以帮助我了解我做错了什么以及我需要做什么来提取流的结果吗?
如果我猜对了,您想将整个文件内容流式传输到一个字符串中。这最好用 Sink.fold
来实现,而不是 Sink.foreach
。示例如下。
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
val sink = Sink.fold[String, ByteString]("") { case (acc, str) =>
acc + str.decodeString("US-ASCII")
}
source.runWith(sink)
}
您可能已经意识到这一点,但您的文件需要适合内存才能让您的程序正确运行。
如果您查看 Sink.foreach
的定义,您会发现评估类型是 Sink[T, Future[Done]]
这意味着元素的计算结果会发生什么并不重要流。以下是定义:
def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
另一方面,Sink.fold
的定义计算出 Future[U]
是 U
zero
的类型。换句话说,您可以定义处理结束时未来的类型。
以下是Sink.fold
的定义(和实现):
def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
根据上面的实现,您可以看到要在具体化中保留的类型是 Future[U]
,因为 Keep.right
的含义类似于:"I don't care if the elements coming in are T
s (or ByteString
in your case) I (the stream) will give you U
s (or String
in your case) .. when I'm done (in a Future
)"
以下是您的案例的工作示例,将 Sink.foreach
替换为 Sink.fold
并将整个表达式计算为 Future[String]
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
var fileString = ""
//def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
val sinkForEach: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString](byteString => fileString =
fileString.concat(byteString.decodeString("US-ASCII")))
/*
def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
*/
val sinkFold: Sink[ByteString, Future[String]] = Sink.fold("") { case (acc, str) =>
acc + str
}
val res1: Future[Done] = source.runWith(sinkForEach)
val res2: Future[String] = source.runWith(sinkFold)
res2
}