使用 zipWith 对带有触发信号的 Akka Stream 的最新元素进行采样?

Sample most recent element of Akka Stream with trigger signal, using zipWith?

我有一个 Planning 系统,可以根据客户订单计算出某种全局 Schedule。当客户向该系统下订单或撤销订单时,或者当计划内事件使用的某些资源变得不可用时,该计划会随着时间的推移而变化。

现在另一个系统需要知道 Schedule 中某些事件的状态。系统在一个消息队列上发送一个 StatusRequest(EventName),我必须对另一个队列上的相应 StatusSignal(EventStatus) 做出反应。

Planning 系统给了我一个 akka-streams Source[Schedule],它在时间表改变时发出一个 Schedule,我也有一个 Source[StatusRequest],我从中收到StatusRequests 和一个 Sink[StatusSignal],我可以向其发送 StatusSignal 回复。

每当我收到 StatusRequest 时,我必须检查当前调度,即 Source[Schedule] 发出的最新值,然后将 StatusSignal 发送到下沉.

我想到了下面的流程

scheduleSource
  .zipWith(statusRequestSource) { (schedule, statusRequest) => 
    findEventStatus(schedule, statusRequest.eventName))
  }
  .map(eventStatus => makeStatusSignal(eventStatus))
  .runWith(statusSignalSink)

但我完全不确定此流程何时真正发出值以及它是否真正实现了我的要求(请参阅上面的粗体文本)。

zipWith reference 说(强调我的):

emits when all of the inputs have an element available

这是什么意思?当 statusRequestSource 发出一个值时,流是否也等到 scheduleSource 发出?或者它是否使用最后发出的值 scheduleSource?同样,当 scheduleSource 发出一个值时会发生什么?它是否会触发带有 statusRequestSource 中最后一个元素的状态信号?

如果流程没有实现我所需要的,我该如何实现呢?

为了回答关于 zipWith 行为的第一组问题,这里有一个简单的测试:

val source1 = Source(1 to 5)
val source2 = Source(1 to 3)

source1
  .zipWith(source2){ (s1Elem, s2Elem) => (s1Elem, s2Elem) }
  .runForeach(println)

// prints:
// (1,1)
// (2,2)
// (3,3)

zipWith 将向下游发射,只要两个输入都有各自的元素可以压缩在一起。

满足您要求的一个想法是分离 scheduleSourcestatusRequestSource。将 scheduleSource 提供给 actor,并让 actor 跟踪它从流中接收到的最新元素。然后有 statusRequestSource query this actor,它将用 scheduleSource 中的最新元素回复。这个演员可能看起来像下面这样:

class LatestElementTracker extends Actor with ActorLogging {

  var latestSchedule: Option[Schedule] = None

  def receive = {
    case schedule: Schedule =>
      latestSchedule = Some(schedule)
    case status: StatusRequest =>
      if (latestSchedule.isEmpty) {
        log.debug("No schedules have been received yet.")
      } else {
        val eventStatus = findEventStatus(latestSchedule.get, status.eventName)
        sender() ! eventStatus
      }
  }
}

要与上述演员整合:

scheduleSource.runForeach(s => trackerActor ! s)

statusRequestSource
  .ask[EventStatus](parallelism = 1)(trackerActor) // adjust parallelism as needed
  .map(eventStatus => makeStatusSignal(eventStatus))
  .runWith(statusSignalSink)