根据优先级对未来的请求进行计算

Making Calculation on Future requests based on priority

我有一个关于处理异步操作和根据优先级采取行动的问题。

考虑以下代码:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Some[Int]] = Future {
  Some(1)
}

// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] = {

  val domainFutures: Future[List[Option[Int]]] = Future.traverse(listIds)(listId => isSiteExcludedAtList(listId, domainHash))
  val subDomainFutures: Future[List[Option[Int]]] = Future.traverse(listIds)(listId => isSiteExcludedAtList(listId, subdomainHash))

  // Is there other way?
  for {
    res <- Future.sequence(
      List(
        subDomainFutures.map(res => "subdomain" -> res),
        domainFutures.map(res => "domain" -> res)
      )
    )
  } yield {
    val subdomainExclusion: List[Int] = res.filter(_._1 == "subdomain").flatMap(_._2).flatten
    val domainExclusion: List[Int] = res.filter(_._1 == "domain").flatMap(_._2).flatten
    if (subdomainExclusion.nonEmpty) {
      s"its subdomain exclusion with results: ${subdomainExclusion}"
    }
    else {
      s"its domain exclusion with results: ${domainExclusion}"
    }
  }
}

我想达到的目标:

注意:子域只等待一个结果是可选优化。

有没有更漂亮的方法来实现这个? 谢谢!

所以你想同时获取域和子域,你也想同时执行尽可能多的isSiteExcludedAtList。此外,如果至少有一个子域名,您想要取消这些域名。

可以很容易地使用 cats-effectfs2 来表示 IO
(以下代码假定 isSiteExcludedAtList return 是 IO[Option[Int]]

import cats.effect.IO
import cats.syntax.all._
import fs2.Stream
import fs2.concurrent.SignallingRef

def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): IO[Unit] = {
  def parallelStreamFor(siteId: Int): Stream[IO, Int] =
    Stream
      .emits(listIds)
      .covary[IO]
      .parEvalMapUnordered(maxConcurrent = 2)(listId => isSiteExcludedAtList(listId, siteId))
      .collect {
        case Some(result) => result
      }
  
  SignallingRef[IO].of(false).flatMap { signal =>
    val processSubdomains =
      parallelStreamFor(siteId = subdomainHash)
        .evalTap(_ => signal.set(true))
        .compile
        .toList

    val processDomains =
      parallelStreamFor(siteId = domainHash)
        .interruptWhen(signal)
        .compile
        .toList

    (processSubdomains,processDomains).parTupled
  } flatMap {
    case (subdomainExclusions, domainExclusions) =>
      if (subdomainExclusions.nonEmpty)
        IO.println(s"Its subdomain exclusion with result: ${subdomainExclusions}")
      else if (domainExclusions.nonEmpty)
        IO.println(s"Its domain exclusion with result: ${domainExclusions}")
      else
        IO.println("All subdomains and domains are included!")
  }
}

几个注意事项:

  • 如果元素的顺序很重要,则将 parEvalMapUnordered 替换为 parEvalMap,这样效率会低一些。
  • 调整 maxConcurrent 的值,使其符合您的工作量。
  • 如果您希望保持每个流同步并且只是 运行 两者同时发生,我们可以将 parEvalMapUnordered + parEvalMapUnordered 替换为对 evalMapFilter[=62 的单个调用=]
  • 由于 IO.fromFutureIO.unsafeToFuture()
  • ,您可以轻松地将其集成到您的代码库中,而无需进行太多重构

可以看到代码运行ning here.


编辑

OLD AND WRONG ANSWER

如果我理解正确的话,你想在第一个结果处停止处理 return a Some

如果您愿意使用 cats-effect,这很容易实现,如下所示:

import cats.effect.IO
import cats.syntax.all._

def isSiteExcludedAtList(listId: Int, siteId: Int): IO[Option[Int]] =
  IO.println(s"Computing for ${listId} - ${siteId}").as(Some(10))

def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): IO[Unit] = {
  val processSubdomains =
    listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = subdomainHash))

  val processDomains =
    listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = domainHash))

  processSubdomains.flatMap {
    case Some(subdomainExclusion) =>
      IO.println(s"Its subdomain exclusion with result: ${subdomainExclusion}")

    case None =>
      processDomains.flatMap {
        case Some(domainExclusion) =>
          IO.println(s"Its domain exclusion with result: ${domainExclusion}")

        case None =>
          IO.println("All subdomains and domains are included!")
      }
  }
}

可以看到代码 运行ning here

Note: Another approach would be to tag each computation with is origin (domain, or subdomain) and combine all them in a big list and perform a single collectFirstSomeM both are equivalent.

我想描述如何在仍然使用 futures 的同时改进您的代码,但我对这段代码的作用有点困惑。 isSiteExcludedAtList return 这个数字是多少?它是一个标识符,你想收集所有列表id的标识符,你只关心你不想使用domainHash查询是否足以使用subdomainHash?这就是您的代码似乎正在做的事情,但是如果我正确理解上面的答案,即带有 cats-effect 和 collectFirstSomeM 的答案,那么该代码只会查找第一个结果 Some(number)然后停下来。例如,如果第一次调用 isSiteExcludedAtList 将 return Some(1) 那么我们就不会再调用任何东西了。

所以,我有三个答案给你。

  1. 这是如果你想收集一个整数列表,你只想避免调用 isSiteExcludedAtListdomainHash 如果调用 subdomainHash 已经给你一些结果。在这种情况下,您可以同时链接 Future.traverse 并仅在第一个 return 没有结果时才调用第二个。
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Some[Int]] = 
  Future { Some(1) }

// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
  for {
    res1   <- Future.traverse(listIds)(isSiteExcludedAtList(_, subdomainHash))
    subIds =  res1.flatten
    res2   <- if (subIds.isEmpty) 
                Future.traverse(listIds)(isSiteExcludedAtList(_, domainHash)) 
              else 
                Future.successful(Nil)
    domIds =  res2.flatten
  } yield 
    if (subIds.nonEmpty)
      s"its subdomain exclusion with results: ${subIds}"
    else if (domIds.nonEmpty)
      s"its domain exclusion with results: ${domIds}"
    else
      "no exclusion"
  1. 这是如果你查找第一个表明listId被排除的结果,然后你不想再查询了。在这种情况下,所有对 isSiteExcludedAtList 的调用都必须链接起来,即只有在前一个调用没有结果时才调用下一个调用。可以用递归来完成:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Option[Int]] = 
  Future { Some(1) }

def isSiteExcludedAtList(listIds: List[Int], hash: Int): Future[Option[Int]] = 
  listIds match {
    case Nil => 
      Future.successful(None)
    case head :: tail => 
      isSiteExcludedAtList(head, hash).flatMap {
        case Some(id) => Future.successful(Some(id))
        case None     => isSiteExcludedAtList(tail, hash)
      }
  }

// if you use Scala 3, change this to an enum
sealed trait Exclusion
final case class SubdomainExclusion(id: Int) extends Exclusion
final case class DomainExclusion(id: Int) extends Exclusion
case object NoExclusion extends Exclusion

// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
  isSiteExcludedAtList(listIds, subdomainHash).flatMap {
    case Some(id) => 
      Future.successful(SubdomainExclusion(id))
    case None     => 
      isSiteExcludedAtList(listIds, domainHash).map {
        case Some(id) => DomainExclusion(id)
        case None     => NoExclusion
      }
  }.map {
    case SubdomainExclusion(id) => s"subdomain exclusion $id"
    case DomainExclusion(id)    => s"domain exclusion: $id"
    case NoExclusion            => "no exclusion"
  }
  1. 第三种可能性是不使用 Future.traverse 并分别请求每个 listId,您将实现一个查询,该查询将 return 给定哈希的所有排除 ID - subdomainHashdomainHash,然后您只需检查您的 listIds 和由该查询编辑的 ID return 是否是 non-empty。该代码与我的第一个答案中的代码类似,但它只会对数据库进行两次调用。我写它是因为根据我的经验,这是处理数据库的一种常见模式:我们有一些已经编写的查询,随着我们的代码变得越来越复杂,我们开始在循环中使用这些查询,这导致 sub-optimal 性能,而我们可以编写更复杂的查询,我们只调用一次。
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// mock database call
def isSiteExcludedAtListBulk(siteId: Int): Future[Set[Int]] = 
  Future { Set(10, 20, 30) }

// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
  for {
    excludedSubIds <- isSiteExcludedAtListBulk(subdomainHash)
    subIds         =  listIds.filter(excludedSubIds)
    excludedDomIds <- if (subIds.isEmpty) 
                        isSiteExcludedAtListBulk(domainHash)
                      else 
                        Future.successful(Set.empty)
    domIds         =  listIds.filter(excludedDomIds)
  } yield 
    if (subIds.nonEmpty)
      s"its subdomain exclusion with results: ${subIds}"
    else if (domIds.nonEmpty)
      s"its domain exclusion with results: ${domIds}"
    else
      "no exclusion"

也许是这样的?

    subdomainFutures.map(_.flatten).flatMap { 
       case sds if (sds.nonEmpty) => Future.successful(sds -> Nil)
       case _ => domainFutures.map(_.flatten).map(Nil -> _)
    }.map {
      case (sds, _) if (sds.nonEmpty) => s"subdomain exclusion $sds"
      case (_, ds) if (ds.nonEmpty) => s"domain exclusion $ds"
      case _ => "no exclusion"
    }

或者,也许,将域查询也提升到同一级别:

    subdomainFutures.zip(domainFutures)
      .map { case (s,d) = (s.flatten, d.flatten) }
      .map {
        case (sds, _) if (sds.nonEmpty) => s"subdomain exclusion $sds"
        case (_, ds) if (ds.nonEmpty) => s"domain exclusion $ds"
        case _ => "no exclusion"
      }

我认为,这或多或少与您所做的相同,只是在 IMO 中以更直接的方式表达。

一个缺点是它会等待所有子域查询返回,即使第一个 returns 结果(第二个变体看起来有点“光滑”,但它也会等待所有域查询无条件地,这是一个额外的低效率)。

有很多方法可以优化它(没有什么是不可能的!)但我想不出任何对我来说对用例来说看起来不会过于复杂的方法。