重放实时收集的数据以模拟真实的流量延迟和消息排序
Replay data collected live to simulate real traffic delays and ordering of messages
有两个输入流,都生成定义为
的对象实例
case class ReplayData(timestamp:Long, payload:Any)
流 1
1,载荷 1
1000,有效载荷3
流 2
400, payload2
1500, payload4
我想实现重播机制
会 将元素按我在每个元素上的时间戳排序向下游推送
它将模拟生产中的现场场景。
此机制需要遵守消息之间的延迟,例如第一条消息发送是 payload1(它的起点),Stream2 的 payload2 应该在 400 毫秒后发送(下一条消息时间戳和初始消息时间戳之间的差异)等等。
我可以很容易地使用 DelayedQueue which usage is explained in this SO thread
An unbounded blocking queue of Delayed elements, in which an element
can only be taken when its delay has expired.
The head of the queue is that Delayed element whose delay expired
furthest in the past. If no delay has expired there is no head and
poll will return null.
Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS)
method returns a value less than or equal to zero. Even though
unexpired elements cannot be removed using take or poll, they are
otherwise treated as normal elements.
For example, the size method returns the count of both expired and
unexpired elements. This queue does not permit null elements. does
not permit null elements.
我正在尝试弄清楚如何在 Akka 流中做到这一点,但很难找到可以为我解决这个问题的东西。
我正在寻找 mergeSorted 合并两个流并根据某些功能对它们进行排序的方法。
而且它似乎或多或少符合基于某些自定义函数进行排序的目的。
我不确定如何根据时间戳处理元素之间的延迟属性。
使用普通的旧 AKKA,我可以使用调度程序来读取数据,对它们进行排序并安排每个元素在时间过去时发送。
我不记得 akka-streams 中有任何内容可以延迟开箱即用的消息,并为每条消息自定义延迟。毕竟 akka-streams 背后的想法是响应式编程。我只知道 2 个选项如何从总体上解决你的问题(假设你已经合并了 2 个来源)
Flow.mapAsync - 在这种情况下,延迟一段时间后 return 和 Future
完全是你的事。例如:
import java.time.LocalDateTime
import java.util.concurrent.Executors
import akka.NotUsed
import akka.actor.ActorSystem
import akka.pattern.after
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
object Application extends App {
implicit val sys: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
case class SomeEntity(time: Int, value: Int)
val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400)).map(i => SomeEntity(i, i * i + 3))
val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val scheduler = sys.scheduler
val f = source
.mapAsync(10)(se => after(se.time.milliseconds, scheduler)(Future.successful(se))(ec))
.runForeach(se => println(s"${LocalDateTime.now()} -> $se"))
f.onComplete(_ => sys.terminate())
}
可能你的用例(毕竟是模拟)实际上并没有那么严格,所以你可能会使用Flow.throttle。它不像第一个解决方案那么简单和精确,但它的性能更高,因为它使用一些轻量级的桶模型来控制项目输出率。
import java.time.LocalDateTime
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object Application extends App {
implicit val sys: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
case class SomeEntity(time: Int, value: Int)
val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400, 1400, 1500, 1900, 2500, 2700)).map(i => SomeEntity(i, i * i + 3))
val future = source.throttle(cost = 1, per = 1.millisecond, _.time).runForeach(se => {
println(s"${LocalDateTime.now()} -> $se")
})
future.onComplete(_ => sys.terminate())
}
有两个输入流,都生成定义为
的对象实例case class ReplayData(timestamp:Long, payload:Any)
流 1
1,载荷 1
1000,有效载荷3
流 2
400, payload2
1500, payload4
我想实现重播机制 会 将元素按我在每个元素上的时间戳排序向下游推送
它将模拟生产中的现场场景。
此机制需要遵守消息之间的延迟,例如第一条消息发送是 payload1(它的起点),Stream2 的 payload2 应该在 400 毫秒后发送(下一条消息时间戳和初始消息时间戳之间的差异)等等。
我可以很容易地使用 DelayedQueue which usage is explained in this SO thread
An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.
The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null.
Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements.
For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements. does not permit null elements.
我正在尝试弄清楚如何在 Akka 流中做到这一点,但很难找到可以为我解决这个问题的东西。
我正在寻找 mergeSorted 合并两个流并根据某些功能对它们进行排序的方法。
而且它似乎或多或少符合基于某些自定义函数进行排序的目的。
我不确定如何根据时间戳处理元素之间的延迟属性。
使用普通的旧 AKKA,我可以使用调度程序来读取数据,对它们进行排序并安排每个元素在时间过去时发送。
我不记得 akka-streams 中有任何内容可以延迟开箱即用的消息,并为每条消息自定义延迟。毕竟 akka-streams 背后的想法是响应式编程。我只知道 2 个选项如何从总体上解决你的问题(假设你已经合并了 2 个来源)
Flow.mapAsync - 在这种情况下,延迟一段时间后 return 和
Future
完全是你的事。例如:import java.time.LocalDateTime import java.util.concurrent.Executors import akka.NotUsed import akka.actor.ActorSystem import akka.pattern.after import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} object Application extends App { implicit val sys: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer() case class SomeEntity(time: Int, value: Int) val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400)).map(i => SomeEntity(i, i * i + 3)) val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) val scheduler = sys.scheduler val f = source .mapAsync(10)(se => after(se.time.milliseconds, scheduler)(Future.successful(se))(ec)) .runForeach(se => println(s"${LocalDateTime.now()} -> $se")) f.onComplete(_ => sys.terminate()) }
可能你的用例(毕竟是模拟)实际上并没有那么严格,所以你可能会使用Flow.throttle。它不像第一个解决方案那么简单和精确,但它的性能更高,因为它使用一些轻量级的桶模型来控制项目输出率。
import java.time.LocalDateTime import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ object Application extends App { implicit val sys: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer() case class SomeEntity(time: Int, value: Int) val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400, 1400, 1500, 1900, 2500, 2700)).map(i => SomeEntity(i, i * i + 3)) val future = source.throttle(cost = 1, per = 1.millisecond, _.time).runForeach(se => { println(s"${LocalDateTime.now()} -> $se") }) future.onComplete(_ => sys.terminate()) }