修改 via 以接受返回 Future 的方法

Modify via to accept method returning a Future

我正在调用一个使用 via 的方法,如下所示:

myRawStr(id)
.take(1)
.via(myMethod("someString", someSource)
.zip(Source.fromIterator(() => Iterator.from(1)))
.collect {
...
}

myMethod returns 键入 Flow[ByteString, MyValidated[MyClass], NotUsed] 但现在它将 returning Future[Flow[ByteString, MyValidated[MyClass], NotUsed]]注意 : 未来)

但是这样做会导致我在 via 上出现编译错误。错误状态:

[error]  found   : [as, mat, ec]scala.concurrent.Future[akka.stream.scaladsl.Flow[akka.util.ByteString,MyValidated[MyClass],akka.NotUsed]]
[error]     (which expands to)  [as, mat, ec]scala.concurrent.Future[akka.stream.scaladsl.Flow[akka.util.ByteString,scala.util.Either[List[ValidationError],MyClass],akka.NotUsed]]
[error]  required: akka.stream.Graph[akka.stream.FlowShape[akka.util.ByteString,?],?]
[error]       .via(myMethod("someString", someSource))
[error]

我如何通过在流程中添加另一个步骤来修改它以接受 Future return 非未来?

via 接受 Flow 作为参数。您要么必须将 MyMethod 更改为 return 一个 Flow 而不是 Future。或者,您可以使用 mapAsyc 而不是使用 via,它使用 return 是 Future (MyMethod) 的方法来映射您的流程:

https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/mapAsync.html