回放历史数据Akka Stream
Playback historical data Akka Stream
是否可以使用 Akka Streams 根据定义的时钟发出数据?或者他们只是在数据到达时就发出(忽略背压)?我特别想知道是否可以使用 "mocked" 时钟回放历史数据,也许可以使用 Source.tick
。
这取决于你"defined clock"的意思。
实际挂墙时间
正如您所提到的,Source.tick
是一种从系统时钟获取实际时间的时钟的可能性。问题是 Sink
可能不会以大于或等于源生成滴答的间隔的速率发出需求信号。例如,您的 Sink 可能每分钟只发出一次需求信号,但您在 Source.tick 中的间隔可能是 10 seconds
。在这种情况下,文档中的 5 个中间刻度将被删除:
If a consumer has not requested any elements at the point in time when
the tick element is produced it will not receive that tick element
later.
模拟时间
总是可以使用 Source
来模拟时间。
我们可以先创建一个函数,使用开始时间、结束时间和间隔来模拟时钟:
type MillisFromEpoch = Long
type MillisInterval = Long
val clock : (MillisFromEpoch, MillisFromEpoch, MillisInterval) => () => Iterator[MillisFromEpoch] =
(startTime, stopTime, interval) => () => new Iterator[MillisFromEpoch] {
var currentTime = startTime
override def hasNext : Boolean = currentTime < stopTime
override def next() : MillisFromEpoch = {
val returnMilis = currentTime
currentTime += interval
return returnMillis
}
}
这个时钟现在可以提供源。例如,我们可以创建一个从 unix epoch 开始并递增 1 秒直到时间结束的时钟:
val epoch : MillisFromEpoch = 0L
val second : MillisInterval = 1000L
val simulatedClockFromEpochSource : Source[MillisFromEpoch,_] =
Source fromIterator clock(epoch, Long.MaxValue, 1*second)
或者我们可以创建一个时钟,从现在开始,以 5 秒为间隔在 60 秒后结束:
val now : MillisFromEpoch = System.currentTimeMillis()
val simulatedClockFromNowSource : Source[MillisFromEpoch,_] =
Source fromIterator clock(now, now + 60*second, 5*second)
采样频率
有一种方法可以使用 Source.tick,即使下游消费者比在源中指定的滴答间隔慢。我们可以创建一个 Flow.filter
,它不断向 Source 发出需求信号,但只会通过定义增量间隔的时间。
我们可以从一个使用内部变量跟踪时间间隔的函数开始:
val frequencySample : (MillisInterval) => (MillisFromEpoch) => Boolean =
(interval) => {
var lastValidTime : MillisFromEpoch = -1
(timeToCheck) => {
if(lastValidTime < 0 || timeToCheck >= lastValidTime + interval) {
lastValidTime = timeToCheck
true
}
else {
false
}
}
}
现在这个函数可以用来创建流:
val frequencySampleFlow : (MillisInterval) => Flow[MillisFromEpoch, MillisFromEpoch, _] =
(frequency) => Flow[MillisFromEpoch] filter frequencySample(frequency)
现在我们可以创建一个频率较慢(例如 10 秒)的流,它附加到频率较高(例如 1 秒)的源:
val slowFrequency : MillisInterval = 10 * second
//simulatedClockFromEpoch ticks every 1 second
//frequnencySampleFlow only passes every 10 second tick through
val slowSource =
simulatedClockFromEpochSource via frequencySampleFlow(slowFrequency)
是否可以使用 Akka Streams 根据定义的时钟发出数据?或者他们只是在数据到达时就发出(忽略背压)?我特别想知道是否可以使用 "mocked" 时钟回放历史数据,也许可以使用 Source.tick
。
这取决于你"defined clock"的意思。
实际挂墙时间
正如您所提到的,Source.tick
是一种从系统时钟获取实际时间的时钟的可能性。问题是 Sink
可能不会以大于或等于源生成滴答的间隔的速率发出需求信号。例如,您的 Sink 可能每分钟只发出一次需求信号,但您在 Source.tick 中的间隔可能是 10 seconds
。在这种情况下,文档中的 5 个中间刻度将被删除:
If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later.
模拟时间
总是可以使用 Source
来模拟时间。
我们可以先创建一个函数,使用开始时间、结束时间和间隔来模拟时钟:
type MillisFromEpoch = Long
type MillisInterval = Long
val clock : (MillisFromEpoch, MillisFromEpoch, MillisInterval) => () => Iterator[MillisFromEpoch] =
(startTime, stopTime, interval) => () => new Iterator[MillisFromEpoch] {
var currentTime = startTime
override def hasNext : Boolean = currentTime < stopTime
override def next() : MillisFromEpoch = {
val returnMilis = currentTime
currentTime += interval
return returnMillis
}
}
这个时钟现在可以提供源。例如,我们可以创建一个从 unix epoch 开始并递增 1 秒直到时间结束的时钟:
val epoch : MillisFromEpoch = 0L
val second : MillisInterval = 1000L
val simulatedClockFromEpochSource : Source[MillisFromEpoch,_] =
Source fromIterator clock(epoch, Long.MaxValue, 1*second)
或者我们可以创建一个时钟,从现在开始,以 5 秒为间隔在 60 秒后结束:
val now : MillisFromEpoch = System.currentTimeMillis()
val simulatedClockFromNowSource : Source[MillisFromEpoch,_] =
Source fromIterator clock(now, now + 60*second, 5*second)
采样频率
有一种方法可以使用 Source.tick,即使下游消费者比在源中指定的滴答间隔慢。我们可以创建一个 Flow.filter
,它不断向 Source 发出需求信号,但只会通过定义增量间隔的时间。
我们可以从一个使用内部变量跟踪时间间隔的函数开始:
val frequencySample : (MillisInterval) => (MillisFromEpoch) => Boolean =
(interval) => {
var lastValidTime : MillisFromEpoch = -1
(timeToCheck) => {
if(lastValidTime < 0 || timeToCheck >= lastValidTime + interval) {
lastValidTime = timeToCheck
true
}
else {
false
}
}
}
现在这个函数可以用来创建流:
val frequencySampleFlow : (MillisInterval) => Flow[MillisFromEpoch, MillisFromEpoch, _] =
(frequency) => Flow[MillisFromEpoch] filter frequencySample(frequency)
现在我们可以创建一个频率较慢(例如 10 秒)的流,它附加到频率较高(例如 1 秒)的源:
val slowFrequency : MillisInterval = 10 * second
//simulatedClockFromEpoch ticks every 1 second
//frequnencySampleFlow only passes every 10 second tick through
val slowSource =
simulatedClockFromEpochSource via frequencySampleFlow(slowFrequency)