处理 Akka 流中丢失的消息
Processing Dropped Message In Akka Streams
我有以下源队列定义。
lazy val (processMessageSource, processMessageQueueFuture) =
peekMatValue(
Source
.queue[(ProcessMessageInputData, Promise[ProcessMessageOutputData])](5, OverflowStrategy.dropNew))
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
处理消息输入数据 Class 本质上是一个工件,它是在调用者调用 Web 服务器端点时创建的,它连接到此流(即服务端点的业务逻辑将消息放入此队列) .处理消息输出的 Promise 是在应用程序的接收器中下游完成的事情,然后 Web 服务器在这个未来有一个 on complete 回调到 return 响应。
还有其他来源进入该流。
现在可以备份缓冲区,因为其他源可能会使系统过载,从而触发流背压。现有代码只是丢弃新消息。但我仍然想完成流程消息输出承诺,以异常说明 "Throttled" 之类的内容完成。
是否有一种机制可以编写自定义溢出策略,或对溢出元素进行 post 处理以允许我执行此操作?
dropNew 就可以了。在客户端它看起来像。
processMessageQueue.offer(in, pr).foreach { res =>
res match {
case Enqueued => // Code to handle case when successfully enqueued.
case Dropped => // Code to handle messages that are dropped since the buffier was overflowing.
}
}
我有以下源队列定义。
lazy val (processMessageSource, processMessageQueueFuture) =
peekMatValue(
Source
.queue[(ProcessMessageInputData, Promise[ProcessMessageOutputData])](5, OverflowStrategy.dropNew))
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
处理消息输入数据 Class 本质上是一个工件,它是在调用者调用 Web 服务器端点时创建的,它连接到此流(即服务端点的业务逻辑将消息放入此队列) .处理消息输出的 Promise 是在应用程序的接收器中下游完成的事情,然后 Web 服务器在这个未来有一个 on complete 回调到 return 响应。
还有其他来源进入该流。
现在可以备份缓冲区,因为其他源可能会使系统过载,从而触发流背压。现有代码只是丢弃新消息。但我仍然想完成流程消息输出承诺,以异常说明 "Throttled" 之类的内容完成。
是否有一种机制可以编写自定义溢出策略,或对溢出元素进行 post 处理以允许我执行此操作?
dropNew 就可以了。在客户端它看起来像。
processMessageQueue.offer(in, pr).foreach { res =>
res match {
case Enqueued => // Code to handle case when successfully enqueued.
case Dropped => // Code to handle messages that are dropped since the buffier was overflowing.
}
}