akka streams - 在使用 conflate 时触发元素流

akka streams - Triggering the flow of elements while using conflate

我试图通过 Web 套接字创建一个服务,该服务接受 JSON,然后在触发时发出最后接收到的值。我的示例代码是:

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, ZipWith}
import akka.stream._
import play.api.libs.json._
import play.api.mvc.WebSocket
import play.core.server.{AkkaHttpServer, Server, ServerConfig}
import play.api.routing.sird._
import scala.io.StdIn

object Trigger extends App {

    import GraphDSL.Implicits._
    implicit val system: ActorSystem = ActorSystem("trigger")
    implicit val materializer: ActorMaterializer = ActorMaterializer.create(system)

    val triggerFilter = Flow[JsValue].filter(json => (json \ "trigger").isDefined)
    val dataFilter = Flow[JsValue].filter(json => (json \ "trigger").isEmpty)

    val triggerBatching = Flow[JsValue].conflate((acc, elem) => elem)
    val dataBatching = Flow[JsValue].conflate((acc, elem) => elem)

    val partial = GraphDSL.create() { implicit builder =>
        val B = builder.add(Broadcast[JsValue](outputPorts = 2))
        val zip = builder.add(ZipWith((msg: JsValue, trigger: JsValue) => msg))

        B ~> dataFilter.async    ~> dataBatching.async    ~> zip.in0
        B ~> triggerFilter.async ~> triggerBatching.async ~> zip.in1

        FlowShape(B.in, zip.out)
    }.named("partial")

    val flow = Flow.fromGraph(partial)

    val BufferSize: Int = 100
    val Port: Int = 9001

    val server: Server = AkkaHttpServer.fromRouterWithComponents(ServerConfig(
        port = Some(Port),
        address = "127.0.0.1"
    )) { components => {
        case GET(p"/ws") => WebSocket.accept[JsValue, JsValue] { request =>
            flow.buffer(size = BufferSize, overflowStrategy = OverflowStrategy.backpressure)
        }
    }}

    if (StdIn.readLine(s"${java.time.Instant.now()} - Press RETURN to stop...\n") != null) {
        server.stop()
    }
}

我试图实现的行为是:

  1. 发送 {"A": 1},什么也没收到
  2. 发送 {"A": 2},什么也没收到
  3. 发送 {"A": 3},什么也没收到
  4. 发送 {"trigger": true},接收 {"A": 3}
  5. 发送 {"trigger": true},什么也没收到

但我看到的是:

  1. 发送 {"A": 1},什么也没收到
  2. 发送 {"A": 2},什么也没收到
  3. 发送 {"A": 3},什么也没收到
  4. 发送 {"trigger": true},接收 {"A": 1}
  5. 发送 {"trigger": true},接收 {"A": 2}
  6. 发送 {"trigger": true},接收 {"A": 3}
  7. 发送 {"trigger": true},什么也没收到

我对 zip 或 conflate 或其他阻止入站 JSON 在收到触发器并响应最新消息之前进行整合的误解是什么?

conflate 适用于下游比上游慢的情况。在您的情况下,上游(即标准输入正在读取的数据)比下游慢,因此不会发生混淆。

当您发送数据时——{"A":1}{"A":2}{"A":3}——您最初什么也没有收到,因为没有相应的元素可以压缩它们。在您发送触发消息之前,这些数据元素是 buffered internally(默认缓冲区大小为 16),它作为向下游发出的压缩对中的第二个元素。

要使用 conflate 查看所需的行为,请使下游比上游慢。这在您的情况下有点棘手,因为您是手动输入流元素。一个想法是向您的流程添加 throttle

val triggerBatching =
  Flow[JsValue]
    .conflate((acc, elem) => elem)
    .throttle(1, 10 seconds)

val dataBatching =
  Flow[JsValue]
    .conflate((acc, elem) => elem)
    .throttle(1, 10 seconds)

然后确保您输入消息的速度足够快。