Akka Stream 中的批处理/框架
Batching / Framing in Akka Stream
我有一个 Source[Animal]
,其中 Animal
有两种类型 Cat
和 Dog
。 source
类似于 dog1, dog2, dog3, cat1, dog4, dog5, cat2, cat3, dog6, dog7, dog8, dog9, dog10, dog11, dog12, cat4 ...
我正在尝试将其转换为以下 Source[Seq[Animal]] - (dog1, dog2, dog3, cat1), (dog4, dog5, cat2), (cat3), (dog6, dog7, dog8), (dog9, dog10, dog11), (dog12, cat4) ...
这是如何工作的:
- 每批最多 3 只狗,每批最多 1 只猫(或者以下解决方案也可以:每批最多 4 只动物,每批最多 1 只猫)
- 猫只能是批处理中的最终(也称为框架)元素
- 此外,我无法在示例中显示速度,但应该有一个超时,之后仍会发出批次(即使未满且没有猫)。像
groupedWithin(4, FiniteDuration(3, SECONDS))
- 整体秩序很重要,必须保持
我一直在尝试 batchWeighted
和 groupedWithin
,但我还没有合适的解决方案。
我尝试过的一个想法是将 Dog
权衡为 1
,将 Cat
权衡为 1000
,并使用 batchWeighted
和 max weight = 1003
,但是这不能确保 Cat
始终是最后一个批处理元素...尝试对 max weight = 3
进行相同操作总是将 Cat
放在不同的组中。
如果有 batchWithin
和 takeWhile
的混合(没有终止)那么它可能已经解决了这个用例。
如果它只是迭代 List
,这是一个非常直接的解决问题,但被限制使用 FlowOps
使它有点具有挑战性
编辑:
目前我正在做以下事情:
.groupedWithin(4, FiniteDuration(4, SECONDS))
.map(frameBatch(_, Vector(), 0))
// groupedWithin internally returns a Vector so is fast for indexed operations
@tailrec
private def frameBatch(
items: Seq[Animal],
result: Vector[Seq[Animal]],
offset: Int
): Vector[Seq[Animal]] = {
val index = seq.indexWhere(!_.isDog, offset) // assume there's an isDog() for simplicity
if (index == -1) {
if (offset == 0) {
Vector(items)
} else {
result :+ items.slice(offset, items.size)
}
} else {
frameBatchAtSyncs(items, result :+ items.slice(offset, index), index + 1)
}
}
仅使用 Akka Stream 基元就可以做到这一点(稍微绕道 Akka Actors):
object BatchFrame {
def batchFrame[M](
source: Source[Animal, M],
batchSize: Int,
interval: FiniteDuration)(implicit system: ActorSystem): Source[Seq[Animal], M] = {
require(batchSize > 0)
import system.dispatcher
implicit val materializer = ActorMaterializer()
val dataSource = source.map(x => Some(x))
val (timerRef, timerSource) = Source.actorRef[Any](1, OverflowStrategy.dropHead).map(_ => None).preMaterialize()
val merged = dataSource.merge(timerSource, eagerComplete = true)
var nextTick: Option[Cancellable] = None
def scheduleTick(): Unit = {
nextTick = nextTick.flatMap { c => c.cancel(); None }
nextTick = Option(system.scheduler.scheduleOnce(interval, timerRef, None))
}
scheduleTick()
merged.statefulMapConcat{ () =>
var dogCount = 0
var frame: List[Animal] = Nil
def emit(): List[Seq[Animal]] = {
scheduleTick()
val ret = List(frame.reverse)
dogCount = 0
frame = Nil
ret
}
def emitWith(a: Animal): List[Seq[Animal]] = {
frame = a :: frame
emit()
}
in: Option[Animal] => {
in match {
case Some(cat: Cat) =>
emitWith(cat)
case Some(dog: Dog) if dogCount < (batchSize - 1) =>
dogCount += 1
frame = dog :: frame
Nil
case Some(dog: Dog) =>
emitWith(dog)
case _ =>
emit()
}
}
}
}
}
主要技巧(我必须查找并试验以向自己证明)是预先实现时间 Source
,这样您就可以使用 ActorRef
来安排时间刻度。
我有一个 Source[Animal]
,其中 Animal
有两种类型 Cat
和 Dog
。 source
类似于 dog1, dog2, dog3, cat1, dog4, dog5, cat2, cat3, dog6, dog7, dog8, dog9, dog10, dog11, dog12, cat4 ...
我正在尝试将其转换为以下 Source[Seq[Animal]] - (dog1, dog2, dog3, cat1), (dog4, dog5, cat2), (cat3), (dog6, dog7, dog8), (dog9, dog10, dog11), (dog12, cat4) ...
这是如何工作的:
- 每批最多 3 只狗,每批最多 1 只猫(或者以下解决方案也可以:每批最多 4 只动物,每批最多 1 只猫)
- 猫只能是批处理中的最终(也称为框架)元素
- 此外,我无法在示例中显示速度,但应该有一个超时,之后仍会发出批次(即使未满且没有猫)。像
groupedWithin(4, FiniteDuration(3, SECONDS))
- 整体秩序很重要,必须保持
我一直在尝试 batchWeighted
和 groupedWithin
,但我还没有合适的解决方案。
我尝试过的一个想法是将 Dog
权衡为 1
,将 Cat
权衡为 1000
,并使用 batchWeighted
和 max weight = 1003
,但是这不能确保 Cat
始终是最后一个批处理元素...尝试对 max weight = 3
进行相同操作总是将 Cat
放在不同的组中。
如果有 batchWithin
和 takeWhile
的混合(没有终止)那么它可能已经解决了这个用例。
如果它只是迭代 List
,这是一个非常直接的解决问题,但被限制使用 FlowOps
使它有点具有挑战性
编辑: 目前我正在做以下事情:
.groupedWithin(4, FiniteDuration(4, SECONDS))
.map(frameBatch(_, Vector(), 0))
// groupedWithin internally returns a Vector so is fast for indexed operations
@tailrec
private def frameBatch(
items: Seq[Animal],
result: Vector[Seq[Animal]],
offset: Int
): Vector[Seq[Animal]] = {
val index = seq.indexWhere(!_.isDog, offset) // assume there's an isDog() for simplicity
if (index == -1) {
if (offset == 0) {
Vector(items)
} else {
result :+ items.slice(offset, items.size)
}
} else {
frameBatchAtSyncs(items, result :+ items.slice(offset, index), index + 1)
}
}
仅使用 Akka Stream 基元就可以做到这一点(稍微绕道 Akka Actors):
object BatchFrame {
def batchFrame[M](
source: Source[Animal, M],
batchSize: Int,
interval: FiniteDuration)(implicit system: ActorSystem): Source[Seq[Animal], M] = {
require(batchSize > 0)
import system.dispatcher
implicit val materializer = ActorMaterializer()
val dataSource = source.map(x => Some(x))
val (timerRef, timerSource) = Source.actorRef[Any](1, OverflowStrategy.dropHead).map(_ => None).preMaterialize()
val merged = dataSource.merge(timerSource, eagerComplete = true)
var nextTick: Option[Cancellable] = None
def scheduleTick(): Unit = {
nextTick = nextTick.flatMap { c => c.cancel(); None }
nextTick = Option(system.scheduler.scheduleOnce(interval, timerRef, None))
}
scheduleTick()
merged.statefulMapConcat{ () =>
var dogCount = 0
var frame: List[Animal] = Nil
def emit(): List[Seq[Animal]] = {
scheduleTick()
val ret = List(frame.reverse)
dogCount = 0
frame = Nil
ret
}
def emitWith(a: Animal): List[Seq[Animal]] = {
frame = a :: frame
emit()
}
in: Option[Animal] => {
in match {
case Some(cat: Cat) =>
emitWith(cat)
case Some(dog: Dog) if dogCount < (batchSize - 1) =>
dogCount += 1
frame = dog :: frame
Nil
case Some(dog: Dog) =>
emitWith(dog)
case _ =>
emit()
}
}
}
}
}
主要技巧(我必须查找并试验以向自己证明)是预先实现时间 Source
,这样您就可以使用 ActorRef
来安排时间刻度。