使用 akka-stream 过滤异步

Filter async with akka-stream

是否有与 mapAsync() 方法等效的方法,但用于 filter

这是一个使用伪代码的例子:

val filter: T => Future[Boolean] = /.../

source.filter(filter).runWith(/.../)
       ^^^^^^

谢谢

我不认为 FlowSource 的直接方法具有您正在寻找的功能,但可用方法的组合会得到您想要的:

def asyncFilter[T](filter: T => Future[Boolean], parallelism : Int = 1)
                  (implicit ec : ExecutionContext) : Flow[T, T, _] =
  Flow[T].mapAsync(parallelism)(t => filter(t).map(_ -> t))
         .filter(_._1)
         .map(_._2)