使用 AKKA Stream 解码 Chunked JSON
Decode chunked JSON with AKKA Stream
我有一个来自输入文件的 Source[ByteString, _]
,像这样有 3 行(实际上输入是一个带有连续流的 TCP 套接字):
{"a":[2
33]
}
现在的问题是我想将其解析为 Source[ChangeMessage,_]
,但是我发现的唯一示例处理的是每一行都有完整的 JSON 消息,而不是每个 JSON 消息可以分成多行。
我发现的一个例子是这个 this 库,但是它期望 }
或 ,
作为最后一个字符,即每行一个 JSON。下面的示例显示了此设置。
"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")
val data = FileIO.fromPath(file)
.via(CirceStreamSupport.decode[ChangeMessage])
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectComplete()
}
另一种选择是使用折叠和平衡 }
,并且仅在整个 JSON 完成时才发出。这样做的问题是折叠运算符仅在流完成时发出,因为这是一个连续的流,我不能在这里使用它。
My question is: What is the fastest way to parse chunked JSON streams
in AKKA Stream and are there any available software that already does
this? If possible I would like to use circe
不幸的是,我不知道有任何 Scala 库支持 JSON 的基于流的解析。在我看来 Google Gson 对此提供了一些支持,但我不完全确定它可以正确处理 "broken" 输入。
但是,您可以做的是以流方式收集 JSON 文档,类似于 Framing.delimiter
所做的。这与您提到的替代方案非常相似,但它没有使用 fold()
;如果你这样做,你可能需要模仿 Framing.delimiter
所做的,而不是寻找单个定界符,你需要平衡花括号(如果可能使用顶级数组,还可以选择括号),缓冲中间数据,直到整个文档通过,您可以将其作为适合解析的单个块发出。
作为旁注,适合在 Akka Streams 中使用的流 JSON 解析器的适当接口可能如下所示:
trait Parser {
def update(data: Array[Byte]) // or String
def pull(): Option[Either[Error, JsonEvent]]
}
where pull()
returns None
如果它不能再读取但是传入文档中没有实际语法错误,并且 JsonEvent
是一些标准结构用于描述流解析器的事件(即具有子类的密封特征,如 BeginObject
、BeginArray
、EndObject
、EndArray
、String
等)。如果您找到或创建这样的库,则可以使用它来解析来自 ByteString
s 的 Akka 流的数据。
正如 knutwalker/akka-stream-json 的文档所述:
This flow even supports parsing multiple json documents in whatever fragmentation they may arrive, which is great for consuming stream/sse based APIs.
在您的情况下,您需要做的只是分隔传入的 ByteString:
"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")
val sourceUnderTest =
FileIO.fromPath(file)
.via(Framing.delimiter(ByteString("\n"), 8192, allowTruncation = true))
.via(CirceStreamSupport.decode[ChangeMessage])
sourceUnderTest
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectNext(ChangeMessage(List(233)))
.expectComplete()
}
这是因为从文件读取时,ByteString 元素包含多行,因此 Circe 无法解析格式错误的 json。当您用新行分隔时,流中的每个元素都是单独的一行,因此 Circe 能够使用上述功能对其进行解析。
我有一个来自输入文件的 Source[ByteString, _]
,像这样有 3 行(实际上输入是一个带有连续流的 TCP 套接字):
{"a":[2
33]
}
现在的问题是我想将其解析为 Source[ChangeMessage,_]
,但是我发现的唯一示例处理的是每一行都有完整的 JSON 消息,而不是每个 JSON 消息可以分成多行。
我发现的一个例子是这个 this 库,但是它期望 }
或 ,
作为最后一个字符,即每行一个 JSON。下面的示例显示了此设置。
"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")
val data = FileIO.fromPath(file)
.via(CirceStreamSupport.decode[ChangeMessage])
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectComplete()
}
另一种选择是使用折叠和平衡 }
,并且仅在整个 JSON 完成时才发出。这样做的问题是折叠运算符仅在流完成时发出,因为这是一个连续的流,我不能在这里使用它。
My question is: What is the fastest way to parse chunked JSON streams in AKKA Stream and are there any available software that already does this? If possible I would like to use circe
不幸的是,我不知道有任何 Scala 库支持 JSON 的基于流的解析。在我看来 Google Gson 对此提供了一些支持,但我不完全确定它可以正确处理 "broken" 输入。
但是,您可以做的是以流方式收集 JSON 文档,类似于 Framing.delimiter
所做的。这与您提到的替代方案非常相似,但它没有使用 fold()
;如果你这样做,你可能需要模仿 Framing.delimiter
所做的,而不是寻找单个定界符,你需要平衡花括号(如果可能使用顶级数组,还可以选择括号),缓冲中间数据,直到整个文档通过,您可以将其作为适合解析的单个块发出。
作为旁注,适合在 Akka Streams 中使用的流 JSON 解析器的适当接口可能如下所示:
trait Parser {
def update(data: Array[Byte]) // or String
def pull(): Option[Either[Error, JsonEvent]]
}
where pull()
returns None
如果它不能再读取但是传入文档中没有实际语法错误,并且 JsonEvent
是一些标准结构用于描述流解析器的事件(即具有子类的密封特征,如 BeginObject
、BeginArray
、EndObject
、EndArray
、String
等)。如果您找到或创建这样的库,则可以使用它来解析来自 ByteString
s 的 Akka 流的数据。
正如 knutwalker/akka-stream-json 的文档所述:
This flow even supports parsing multiple json documents in whatever fragmentation they may arrive, which is great for consuming stream/sse based APIs.
在您的情况下,您需要做的只是分隔传入的 ByteString:
"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")
val sourceUnderTest =
FileIO.fromPath(file)
.via(Framing.delimiter(ByteString("\n"), 8192, allowTruncation = true))
.via(CirceStreamSupport.decode[ChangeMessage])
sourceUnderTest
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectNext(ChangeMessage(List(233)))
.expectComplete()
}
这是因为从文件读取时,ByteString 元素包含多行,因此 Circe 无法解析格式错误的 json。当您用新行分隔时,流中的每个元素都是单独的一行,因此 Circe 能够使用上述功能对其进行解析。