使用 akka-stream 过滤异步
Filter async with akka-stream
是否有与 mapAsync()
方法等效的方法,但用于 filter
?
这是一个使用伪代码的例子:
val filter: T => Future[Boolean] = /.../
source.filter(filter).runWith(/.../)
^^^^^^
谢谢
我不认为 Flow
或 Source
的直接方法具有您正在寻找的功能,但可用方法的组合会得到您想要的:
def asyncFilter[T](filter: T => Future[Boolean], parallelism : Int = 1)
(implicit ec : ExecutionContext) : Flow[T, T, _] =
Flow[T].mapAsync(parallelism)(t => filter(t).map(_ -> t))
.filter(_._1)
.map(_._2)
是否有与 mapAsync()
方法等效的方法,但用于 filter
?
这是一个使用伪代码的例子:
val filter: T => Future[Boolean] = /.../
source.filter(filter).runWith(/.../)
^^^^^^
谢谢
我不认为 Flow
或 Source
的直接方法具有您正在寻找的功能,但可用方法的组合会得到您想要的:
def asyncFilter[T](filter: T => Future[Boolean], parallelism : Int = 1)
(implicit ec : ExecutionContext) : Flow[T, T, _] =
Flow[T].mapAsync(parallelism)(t => filter(t).map(_ -> t))
.filter(_._1)
.map(_._2)