回放历史数据Akka Stream

Playback historical data Akka Stream

是否可以使用 Akka Streams 根据定义的时钟发出数据?或者他们只是在数据到达时就发出(忽略背压)?我特别想知道是否可以使用 "mocked" 时钟回放历史数据,也许可以使用 Source.tick

这取决于你"defined clock"的意思。

实际挂墙时间

正如您所提到的,Source.tick 是一种从系统时钟获取实际时间的时钟的可能性。问题是 Sink 可能不会以大于或等于源生成滴答的间隔的速率发出需求信号。例如,您的 Sink 可能每分钟只发出一次需求信号,但您在 Source.tick 中的间隔可能是 10 seconds。在这种情况下,文档中的 5 个中间刻度将被删除:

If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later.

模拟时间

总是可以使用 Source 来模拟时间。

我们可以先创建一个函数,使用开始时间、结束时间和间隔来模拟时钟:

type MillisFromEpoch = Long

type MillisInterval = Long


val clock : (MillisFromEpoch, MillisFromEpoch, MillisInterval) => () => Iterator[MillisFromEpoch] = 
  (startTime, stopTime, interval) => () => new Iterator[MillisFromEpoch] {
    var currentTime = startTime

    override def hasNext : Boolean = currentTime < stopTime

    override def next() : MillisFromEpoch = {
      val returnMilis = currentTime
      currentTime += interval

      return returnMillis
    }
  }

这个时钟现在可以提供源。例如,我们可以创建一个从 unix epoch 开始并递增 1 秒直到时间结束的时钟:

val epoch : MillisFromEpoch = 0L

val second : MillisInterval = 1000L

val simulatedClockFromEpochSource : Source[MillisFromEpoch,_] = 
  Source fromIterator clock(epoch, Long.MaxValue, 1*second)

或者我们可以创建一个时钟,从现在开始,以 5 秒为间隔在 60 秒后结束:

val now : MillisFromEpoch = System.currentTimeMillis()

val simulatedClockFromNowSource : Source[MillisFromEpoch,_] = 
  Source fromIterator clock(now, now + 60*second, 5*second)

采样频率

有一种方法可以使用 Source.tick,即使下游消费者比在源中指定的滴答间隔慢。我们可以创建一个 Flow.filter,它不断向 Source 发出需求信号,但只会通过定义增量间隔的时间。

我们可以从一个使用内部变量跟踪时间间隔的函数开始:

val frequencySample : (MillisInterval) => (MillisFromEpoch) => Boolean = 
  (interval) => {

    var lastValidTime : MillisFromEpoch = -1

    (timeToCheck) => {
      if(lastValidTime < 0 || timeToCheck >= lastValidTime + interval) {
        lastValidTime = timeToCheck

        true
      }
      else {
        false
      }
    }
  }

现在这个函数可以用来创建流:

val frequencySampleFlow : (MillisInterval) => Flow[MillisFromEpoch, MillisFromEpoch, _] =
  (frequency) => Flow[MillisFromEpoch]  filter frequencySample(frequency)

现在我们可以创建一个频率较慢(例如 10 秒)的流,它附加到频率较高(例如 1 秒)的源:

val slowFrequency : MillisInterval = 10 * second

//simulatedClockFromEpoch ticks every 1 second
//frequnencySampleFlow only passes every 10 second tick through
val slowSource = 
  simulatedClockFromEpochSource via frequencySampleFlow(slowFrequency)