Akka SourceQueue 发送列表元素
Akka SourceQueue to send list elements
我有一个 List[String]
和一个 Source.queue
。我想在一段时间后提供这个队列字符串元素。像这样:
val data : List[String] = ""
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(data(??))
有人可以帮我吗?
编辑:我找到了一种方法,但正在寻找更优雅的方法
val tick = Source.tick(0 second, 2 second, "tick").zipWithIndex.limit(data.length)
tick.runForeach(t => {
queue.offer(data(t._2.toInt))
})
要按每个元素的特定时间间隔将 List[String]
中的元素发送到队列,请按以下方式使用 Source#delay
:
val data: List[String] = ???
Source(data)
.delay(2.seconds, DelayOverflowStrategy.backpressure)
.withAttributes(Attributes.inputBuffer(1, 1))
.mapAsync(1)(x => queue.offer(x))
.runWith(Sink.ignore)
使用withAttributes
将输入缓冲区大小设置为1,因为默认值为16,并使用DelayOverflowStrategy.backpressure
。另外,使用 mapAsync
因为 offer
方法 returns a Future
.
或者,使用 Source#throttle
:
Source(data)
.throttle(1, 2.seconds, 1, ThrottleMode.Shaping)
.mapAsync(1)(x => queue.offer(x))
.runWith(Sink.ignore)
我有一个 List[String]
和一个 Source.queue
。我想在一段时间后提供这个队列字符串元素。像这样:
val data : List[String] = ""
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(data(??))
有人可以帮我吗?
编辑:我找到了一种方法,但正在寻找更优雅的方法
val tick = Source.tick(0 second, 2 second, "tick").zipWithIndex.limit(data.length)
tick.runForeach(t => {
queue.offer(data(t._2.toInt))
})
要按每个元素的特定时间间隔将 List[String]
中的元素发送到队列,请按以下方式使用 Source#delay
:
val data: List[String] = ???
Source(data)
.delay(2.seconds, DelayOverflowStrategy.backpressure)
.withAttributes(Attributes.inputBuffer(1, 1))
.mapAsync(1)(x => queue.offer(x))
.runWith(Sink.ignore)
使用withAttributes
将输入缓冲区大小设置为1,因为默认值为16,并使用DelayOverflowStrategy.backpressure
。另外,使用 mapAsync
因为 offer
方法 returns a Future
.
或者,使用 Source#throttle
:
Source(data)
.throttle(1, 2.seconds, 1, ThrottleMode.Shaping)
.mapAsync(1)(x => queue.offer(x))
.runWith(Sink.ignore)