具有错误累积的异步处理的函数签名

Function signature for async processing with errors accumulation

假设我有一个函数fab: A => Future[B]。现在我需要编写新函数 foo 来处理 Seq[A] 并累积所有错误。这就是我不能使用 Future.traverse 的原因,因为它 "fails fast" 并且不会累积错误。

foo 收到 Seq[A] 并且应该 return 一个 Future。对于输入的每个元素 Seq[A],客户端应该得到 B 或异常。这个函数的签名是什么?

我有 ZIO 的解决方案。

我添加了这个假函数:

  def fab(implicit ec: ExecutionContext): Int => Future[String] = i => Future(
    if (i % 3 == 0)
      throw new IllegalArgumentException(s"bad $i")
    else
      s"$i"
  )

现在我们为每个Int和运行fab创建一个流

  val stream =
    ZStream.fromIterable(Seq(1, 2, 3, 4, 5))
      .map(in => Task.fromFuture(implicit ec => fab(ec)(in)))

  val sink = Sink.collectAll[Task[String]]

现在我们收集成功和失败的案例:

  val collect: ZIO[zio.ZEnv, Throwable, (List[String], List[Throwable])] = for {
    strs <- stream.run(sink)
    successes <- Task.collectAllSuccesses(strs)
    failures <- ZIO.collectAllSuccesses(strs.map(_.flip))
  } yield (successes, failures)

运行 并打印:

  new DefaultRuntime {}
    .unsafeRun(
      collect
        .tapError { ex => zio.console.putStrLn(s"There was an exception: ${ex.getMessage}") }
        .tap { case (successes, failures) => zio.console.putStrLn(s"($successes, $failures)") }
        .fold(_ => -1, _ => 0)
    )

打印我们:

(List(1, 2, 4, 5), List(java.lang.IllegalArgumentException: bad 3))

如果您需要更多解释,请告诉我 - 如果 ZIO 是一个选项。

要根据需要定义 foo,请考虑在将 fab 应用到输入列表的各个元素后,在 map/recover 之上使用 Future.sequence,如下所示:

import scala.concurrent.{ Future, ExecutionContext }

def foo[A, B](ls: List[A])(fab: A => Future[B])(implicit ec: ExecutionContext):
    Future[List[Either[Throwable, B]]] =
  Future.sequence(ls.map(fab).map(_.map(Right(_)).recover{ case e => Left(e) }))

请注意,不可变的 ListSeq 更受欢迎,因此在这里使用。如有必要,将其更改为 Seq

测试foo

implicit val ec = ExecutionContext.global

def fab(s: String): Future[Int] = Future{ 10 / s.length }

val ls = List("abcd", "", "xx", "")

foo(ls)(fab)
// res1: Future[List[Either[Throwable, Int]]] = Future(Success(List(
//   Right(2),
//   Left(java.lang.ArithmeticException: / by zero),
//   Right(5),
//   Left(java.lang.ArithmeticException: / by zero)
// )))