如何摆脱 Akka Stream Flow 阶段的 "extra" Future?

How to get rid of "extra" Future in Akka Stream Flow stages?

我正在使用 Akka Streams 并尝试使用 Flow 阶段来以最有效的方式描述整个图。在某些阶段,我通过 ask 模式向演员发送消息。

当然,当我使用 ask 模式时,我需要使用 mapTo 才能获得预期的类型以进行进一步处理。

这是一个例子:

val runnableGraph = Source.single(CheckEntity(entity))
  .map { check =>
    (postgresActor ? check)
      .mapTo[CheckEntityResult]
      .map {
        case failure: PostgresFailure => Left(failure.message)
        case pEntity: PEntity => Right(check)
    }
  }
  .map {
    _.map {
      case Left(msg) => Future(Left(msg))
      case Right(check) =>
        (redisActor ? pEntity)
          .mapTo[CheckEntityResult]
          .map {
            case failure: RedisFailure => Left(failure.message)
            case rEntity: REntity => Right(rEntity)
          }
    }
  }
  .toMat(Sink.head)(Keep.right)

//The result's type is Future[Future[Either[String, Entity]]]
val futureResult = runnableGraph.run()

如何去除阶段之间的嵌套 Future

使 CheckEntity 元素更容易通过流传播的一个想法是更改 CheckEntityResult class 以包含相应的 CheckEntity 实例。这看起来像这样:

abstract class CheckEntityResult(entity: CheckEntity) extends Entity

case class PEntity(entity: CheckEntity) extends CheckEntityResult(entity)
case class PostgresFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)

case class REntity(entity: CheckEntity) extends CheckEntityResult(entity)
case class RedisFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)

然后,在调整您的 actor 来处理这些消息后,您可以使用 Source # askmapAsync(根据需要调整并行度级别)与 actor 交互并避免嵌套 Future 在物化值中:

implicit val askTimeout = Timeout(5.seconds)

val runnableGraph = Source.single(CheckEntity(entity))
  .ask[CheckEntityResult](parallelism = 3)(postgresActor)
  .map {
    case PostgresFailure(_, msg) => msg
    case PEntity(e) => e
  }
  .mapAsync(parallelism = 3) {
    case failureMsg: String => Future.successful(failureMsg)
    case e: CheckEntity => (redisActor ? e).mapTo[CheckEntityResult]
  }
  .map {
    case failureMsg: String => Left(failureMsg)
    case RedisFailure(_, msg) => Left(msg)
    case r: REntity => Right(r)
  }
  .toMat(Sink.head)(Keep.right)

val futureResult = runnableGraph.run() // Future[Either[String, Entity]]

您可以考虑将您的 actor 查询转换为 Flow 以及 mapAsync(具有适当的并行性):

val postgresCheck = (check: CheckEntity) =>
    (postgresActor ? check).mapTo[CheckEntityResult]
      .map {
        case failure: PostgresFailure => Left(failure.message)
        case pEntity: PEntity => Right(check)
      }

val redisCheck = (e: Either[String, CheckEntityResult]) => e match {
    case Left(msg) => Future(Left(msg))
    case Right(checkResult) =>
      (redisActor ? checkResult).mapTo[CheckEntityResult]
        .map {
          case failure: RedisFailure => Left(failure.message)
          case rEntity: REntity => Right(rEntity)
        }
  }

val postgresCheckFlow = (parallelism: Int) =>
    Flow[CheckEntity]
      .mapAsync[Either[String, CheckEntityResult]](parallelism)(postgresCheck)

val redisCheckFlow = (parallelism: Int) =>
    Flow[Either[String, CheckEntityResult]]
      .mapAsync[Either[String, CheckEntityResult]](parallelism)(redisCheck)

使用转换后的流,您的 runnableGraph 可以按如下方式组装,结果类型为 Future[Either[]]:

val runnableGraph = Source.single(CheckEntity(entity))
  .via(postgresCheckFlow(parallelism))
  .via(redisCheckFlow(parallelism))
  ...