Akka Stream 中的批处理/框架

Batching / Framing in Akka Stream

我有一个 Source[Animal],其中 Animal 有两种类型 CatDogsource 类似于 dog1, dog2, dog3, cat1, dog4, dog5, cat2, cat3, dog6, dog7, dog8, dog9, dog10, dog11, dog12, cat4 ... 我正在尝试将其转换为以下 Source[Seq[Animal]] - (dog1, dog2, dog3, cat1), (dog4, dog5, cat2), (cat3), (dog6, dog7, dog8), (dog9, dog10, dog11), (dog12, cat4) ... 这是如何工作的:

我一直在尝试 batchWeightedgroupedWithin,但我还没有合适的解决方案。

我尝试过的一个想法是将 Dog 权衡为 1,将 Cat 权衡为 1000,并使用 batchWeightedmax weight = 1003,但是这不能确保 Cat 始终是最后一个批处理元素...尝试对 max weight = 3 进行相同操作总是将 Cat 放在不同的组中。

如果有 batchWithintakeWhile 的混合(没有终止)那么它可能已经解决了这个用例。

如果它只是迭代 List,这是一个非常直接的解决问题,但被限制使用 FlowOps 使它有点具有挑战性

编辑: 目前我正在做以下事情:

  .groupedWithin(4, FiniteDuration(4, SECONDS))
  .map(frameBatch(_, Vector(), 0))
  // groupedWithin internally returns a Vector so is fast for indexed operations

  @tailrec
  private def frameBatch(
      items: Seq[Animal],
      result: Vector[Seq[Animal]],
      offset: Int
    ): Vector[Seq[Animal]] = {
    val index = seq.indexWhere(!_.isDog, offset) // assume there's an isDog() for simplicity
    if (index == -1) {
      if (offset == 0) {
        Vector(items)
      } else {
        result :+ items.slice(offset, items.size)
      }
    } else {
      frameBatchAtSyncs(items, result :+ items.slice(offset, index), index + 1)
    }
  }

仅使用 Akka Stream 基元就可以做到这一点(稍微绕道 Akka Actors):

object BatchFrame {
  def batchFrame[M](
    source: Source[Animal, M],
    batchSize: Int,
    interval: FiniteDuration)(implicit system: ActorSystem): Source[Seq[Animal], M] = {

    require(batchSize > 0)

    import system.dispatcher

    implicit val materializer = ActorMaterializer()

    val dataSource = source.map(x => Some(x))
    val (timerRef, timerSource) = Source.actorRef[Any](1, OverflowStrategy.dropHead).map(_ => None).preMaterialize()

    val merged = dataSource.merge(timerSource, eagerComplete = true)

    var nextTick: Option[Cancellable] = None

    def scheduleTick(): Unit = {
      nextTick = nextTick.flatMap { c => c.cancel(); None }
      nextTick = Option(system.scheduler.scheduleOnce(interval, timerRef, None))
    }

    scheduleTick()

    merged.statefulMapConcat{ () =>
      var dogCount = 0
      var frame: List[Animal] = Nil

      def emit(): List[Seq[Animal]] = {
        scheduleTick()
        val ret = List(frame.reverse)
        dogCount = 0
        frame = Nil
        ret
      }

      def emitWith(a: Animal): List[Seq[Animal]] = {
        frame = a :: frame
        emit()
      }

      in: Option[Animal] => {
        in match {
          case Some(cat: Cat) =>
            emitWith(cat)
          case Some(dog: Dog) if dogCount < (batchSize - 1) =>
            dogCount += 1
            frame = dog :: frame
            Nil
          case Some(dog: Dog) =>
            emitWith(dog)
          case _ =>
            emit()
        }
      }
    }
  }
}

主要技巧(我必须查找并试验以向自己证明)是预先实现时间 Source,这样您就可以使用 ActorRef 来安排时间刻度。