使用 AKKA Stream 解码 Chunked JSON

Decode chunked JSON with AKKA Stream

我有一个来自输入文件的 Source[ByteString, _],像这样有 3 行(实际上输入是一个带有连续流的 TCP 套接字):

{"a":[2
33]
}

现在的问题是我想将其解析为 Source[ChangeMessage,_],但是我发现的唯一示例处理的是每一行都有完整的 JSON 消息,而不是每个 JSON 消息可以分成多行。

我发现的一个例子是这个 this 库,但是它期望 }, 作为最后一个字符,即每行一个 JSON。下面的示例显示了此设置。

"My decoder" should "decode chunked json" in {
    implicit val sys = ActorSystem("test")
    implicit val mat = ActorMaterializer()
    val file = Paths.get("chunked_json_stream.json")
    val data = FileIO.fromPath(file)
    .via(CirceStreamSupport.decode[ChangeMessage])
    .runWith(TestSink.probe[ChangeMessage])
    .request(1)
    .expectComplete()
  }

另一种选择是使用折叠和平衡 },并且仅在整个 JSON 完成时才发出。这样做的问题是折叠运算符仅在流完成时发出,因为这是一个连续的流,我不能在这里使用它。

My question is: What is the fastest way to parse chunked JSON streams in AKKA Stream and are there any available software that already does this? If possible I would like to use circe

不幸的是,我不知道有任何 Scala 库支持 JSON 的基于流的解析。在我看来 Google Gson 对此提供了一些支持,但我不完全确定它可以正确处理 "broken" 输入。

但是,您可以做的是以流方式收集 JSON 文档,类似于 Framing.delimiter 所做的。这与您提到的替代方案非常相似,但它没有使用 fold();如果你这样做,你可能需要模仿 Framing.delimiter 所做的,而不是寻找单个定界符,你需要平衡花括号(如果可能使用顶级数组,还可以选择括号),缓冲中间数据,直到整个文档通过,您可以将其作为适合解析的单个块发出。

作为旁注,适合在 Akka Streams 中使用的流 JSON 解析器的适当接口可能如下所示:

trait Parser {
  def update(data: Array[Byte])  // or String
  def pull(): Option[Either[Error, JsonEvent]]
}

where pull() returns None 如果它不能再读取但是传入文档中没有实际语法错误,并且 JsonEvent 是一些标准结构用于描述流解析器的事件(即具有子类的密封特征,如 BeginObjectBeginArrayEndObjectEndArrayString 等)。如果您找到或创建这样的库,则可以使用它来解析来自 ByteStrings 的 Akka 流的数据。

正如 knutwalker/akka-stream-json 的文档所述:

This flow even supports parsing multiple json documents in whatever fragmentation they may arrive, which is great for consuming stream/sse based APIs.

在您的情况下,您需要做的只是分隔传入的 ByteString:

"My decoder" should "decode chunked json" in {
    implicit val sys = ActorSystem("test")
    implicit val mat = ActorMaterializer()
    val file = Paths.get("chunked_json_stream.json")

    val sourceUnderTest =
      FileIO.fromPath(file)
        .via(Framing.delimiter(ByteString("\n"), 8192, allowTruncation = true))
        .via(CirceStreamSupport.decode[ChangeMessage])

    sourceUnderTest
      .runWith(TestSink.probe[ChangeMessage])
      .request(1)
      .expectNext(ChangeMessage(List(233)))
      .expectComplete()
}

这是因为从文件读取时,ByteString 元素包含多行,因此 Circe 无法解析格式错误的 json。当您用新行分隔时,流中的每个元素都是单独的一行,因此 Circe 能够使用上述功能对其进行解析。