具有错误累积的异步处理的函数签名
Function signature for async processing with errors accumulation
假设我有一个函数fab: A => Future[B]
。现在我需要编写新函数 foo
来处理 Seq[A]
并累积所有错误。这就是我不能使用 Future.traverse 的原因,因为它 "fails fast" 并且不会累积错误。
foo
收到 Seq[A]
并且应该 return 一个 Future
。对于输入的每个元素 Seq[A]
,客户端应该得到 B
或异常。这个函数的签名是什么?
我有 ZIO 的解决方案。
我添加了这个假函数:
def fab(implicit ec: ExecutionContext): Int => Future[String] = i => Future(
if (i % 3 == 0)
throw new IllegalArgumentException(s"bad $i")
else
s"$i"
)
现在我们为每个Int
和运行fab
创建一个流
val stream =
ZStream.fromIterable(Seq(1, 2, 3, 4, 5))
.map(in => Task.fromFuture(implicit ec => fab(ec)(in)))
val sink = Sink.collectAll[Task[String]]
现在我们收集成功和失败的案例:
val collect: ZIO[zio.ZEnv, Throwable, (List[String], List[Throwable])] = for {
strs <- stream.run(sink)
successes <- Task.collectAllSuccesses(strs)
failures <- ZIO.collectAllSuccesses(strs.map(_.flip))
} yield (successes, failures)
运行 并打印:
new DefaultRuntime {}
.unsafeRun(
collect
.tapError { ex => zio.console.putStrLn(s"There was an exception: ${ex.getMessage}") }
.tap { case (successes, failures) => zio.console.putStrLn(s"($successes, $failures)") }
.fold(_ => -1, _ => 0)
)
打印我们:
(List(1, 2, 4, 5), List(java.lang.IllegalArgumentException: bad 3))
如果您需要更多解释,请告诉我 - 如果 ZIO 是一个选项。
要根据需要定义 foo
,请考虑在将 fab
应用到输入列表的各个元素后,在 map/recover
之上使用 Future.sequence
,如下所示:
import scala.concurrent.{ Future, ExecutionContext }
def foo[A, B](ls: List[A])(fab: A => Future[B])(implicit ec: ExecutionContext):
Future[List[Either[Throwable, B]]] =
Future.sequence(ls.map(fab).map(_.map(Right(_)).recover{ case e => Left(e) }))
请注意,不可变的 List
比 Seq
更受欢迎,因此在这里使用。如有必要,将其更改为 Seq
。
测试foo
:
implicit val ec = ExecutionContext.global
def fab(s: String): Future[Int] = Future{ 10 / s.length }
val ls = List("abcd", "", "xx", "")
foo(ls)(fab)
// res1: Future[List[Either[Throwable, Int]]] = Future(Success(List(
// Right(2),
// Left(java.lang.ArithmeticException: / by zero),
// Right(5),
// Left(java.lang.ArithmeticException: / by zero)
// )))
假设我有一个函数fab: A => Future[B]
。现在我需要编写新函数 foo
来处理 Seq[A]
并累积所有错误。这就是我不能使用 Future.traverse 的原因,因为它 "fails fast" 并且不会累积错误。
foo
收到 Seq[A]
并且应该 return 一个 Future
。对于输入的每个元素 Seq[A]
,客户端应该得到 B
或异常。这个函数的签名是什么?
我有 ZIO 的解决方案。
我添加了这个假函数:
def fab(implicit ec: ExecutionContext): Int => Future[String] = i => Future(
if (i % 3 == 0)
throw new IllegalArgumentException(s"bad $i")
else
s"$i"
)
现在我们为每个Int
和运行fab
创建一个流
val stream =
ZStream.fromIterable(Seq(1, 2, 3, 4, 5))
.map(in => Task.fromFuture(implicit ec => fab(ec)(in)))
val sink = Sink.collectAll[Task[String]]
现在我们收集成功和失败的案例:
val collect: ZIO[zio.ZEnv, Throwable, (List[String], List[Throwable])] = for {
strs <- stream.run(sink)
successes <- Task.collectAllSuccesses(strs)
failures <- ZIO.collectAllSuccesses(strs.map(_.flip))
} yield (successes, failures)
运行 并打印:
new DefaultRuntime {}
.unsafeRun(
collect
.tapError { ex => zio.console.putStrLn(s"There was an exception: ${ex.getMessage}") }
.tap { case (successes, failures) => zio.console.putStrLn(s"($successes, $failures)") }
.fold(_ => -1, _ => 0)
)
打印我们:
(List(1, 2, 4, 5), List(java.lang.IllegalArgumentException: bad 3))
如果您需要更多解释,请告诉我 - 如果 ZIO 是一个选项。
要根据需要定义 foo
,请考虑在将 fab
应用到输入列表的各个元素后,在 map/recover
之上使用 Future.sequence
,如下所示:
import scala.concurrent.{ Future, ExecutionContext }
def foo[A, B](ls: List[A])(fab: A => Future[B])(implicit ec: ExecutionContext):
Future[List[Either[Throwable, B]]] =
Future.sequence(ls.map(fab).map(_.map(Right(_)).recover{ case e => Left(e) }))
请注意,不可变的 List
比 Seq
更受欢迎,因此在这里使用。如有必要,将其更改为 Seq
。
测试foo
:
implicit val ec = ExecutionContext.global
def fab(s: String): Future[Int] = Future{ 10 / s.length }
val ls = List("abcd", "", "xx", "")
foo(ls)(fab)
// res1: Future[List[Either[Throwable, Int]]] = Future(Success(List(
// Right(2),
// Left(java.lang.ArithmeticException: / by zero),
// Right(5),
// Left(java.lang.ArithmeticException: / by zero)
// )))