如何在 KSQLDB 的输出主题上构造嵌套 JSON 消息
How to Construct Nested JSON Message on Output Topic in KSQLDB
我从其中一个源系统收到了以下事件负载
为以下 json 有效载荷创建了 Stream1
事件JSON 1
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"40000",
"apr":"2.6%",
"minpayment":"400",
"status":"Approved"
}
}
}
事件JSON 2
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"70000",
"apr":"3.6%",
"minpayment":"600",
"status":"Rejected"
}
}
}
我在上面的 stream1
上创建了聚合 table
CREATE TABLE EVENT_TABLE AS
SELECT
avg(minpayment) as Avg_MinPayment,
avg(apr) AS Avg_APr,
avg(offeramount) AS Avgofferamount ,
status
FROM STREAM1
GROUP BY status
EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
我从 KTable 和 KTable 主题中得到了上面的结果 json 看起来像这样
合计JSON1
打印'EVENT_TABLE';
{
"Status" : "Approved",
"Avg_Minpayment" : "400",
"Avg_APr" : "2.6%",
"offeramount" : "40000"
}
聚合JSON2
{
"Status" : "Rejected",
"Avg_Minpayment" : "600",
"Avg_APr" : "3.6%",
"offeramount" : "70000"
}
但我必须构建并发布最终目标 json 输出主题如下 json 格式。我必须将 header 和 body 添加到聚合 json1 和聚合 json2.
{
"event":{
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
鉴于您的示例 SQL 不会产生示例输出,然后再给出示例输入,因此您想要实现的目标并不是很清楚。事实上,您的示例 SQL 会因未知列错误而失败。
类似于以下内容的内容 会 生成您的示例输出:
CREATE TABLE EVENT_TABLE AS
SELECT
status,
avg(eventDetails->minpayment) as Avg_MinPayment,
avg(eventDetails->apr) AS Avg_APr,
avg(eventDetails->offeramount) AS Avgofferamount
FROM STREAM1
GROUP BY status
EMIT CHANGES;
接下来,您的示例输出...
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
...每个状态输出一行。然而,你说你想要实现的输出......
{
"event":{
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
...包含两种状态,即将您的两个示例输入消息组合成一个输出。
如果我没理解错,你确实想输出上面的内容JSON,那么:
您首先需要包含 event
信息。但是哪个事件信息呢?如果您知道它们总是相同的,那么您可以使用:
CREATE TABLE EVENT_TABLE AS
SELECT
status,
latest_by_offset(event) as event,
avg(eventDetails->minpayment) as Avg_MinPayment,
avg(eventDetails->apr) AS Avg_APr,
avg(eventDetails->offeramount) AS Avgofferamount
FROM STREAM1
GROUP BY status
EMIT CHANGES;
latest_by_offset
聚合函数将从它看到的最后一条消息中捕获 event
信息。虽然我不相信这是你想要的。您不能收到其他 rejected
和 accepted
消息,其中包含 不同的 event
信息吗?如果是 event
信息标识哪些消息应该组合在一起,那么像这样的东西可能会给你一些接近你想要的东西:
CREATE TABLE EVENT_TABLE AS
SELECT
event,
collect_list(eventDetails) as body
FROM STREAM1
GROUP BY event
EMIT CHANGES;
如果这很接近,那么您可能需要使用 STRUCT
构造函数和 AS_VALUE
函数来重构您的输出。例如:
CREATE TABLE EVENT_TABLE AS
SELECT
event as key,
AS_VALUE(event) as event,
STRUCT(
keys := collect_list(eventDetails)
) as body
FROM STREAM1
GROUP BY event
EMIT CHANGES;
我从其中一个源系统收到了以下事件负载
为以下 json 有效载荷创建了 Stream1
事件JSON 1
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"40000",
"apr":"2.6%",
"minpayment":"400",
"status":"Approved"
}
}
}
事件JSON 2
{
"event": {
"header": {
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234","customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123","accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"offeramount":"70000",
"apr":"3.6%",
"minpayment":"600",
"status":"Rejected"
}
}
}
我在上面的 stream1
上创建了聚合 tableCREATE TABLE EVENT_TABLE AS
SELECT
avg(minpayment) as Avg_MinPayment,
avg(apr) AS Avg_APr,
avg(offeramount) AS Avgofferamount ,
status
FROM STREAM1
GROUP BY status
EMIT CHANGES;
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
我从 KTable 和 KTable 主题中得到了上面的结果 json 看起来像这样
合计JSON1
打印'EVENT_TABLE';
{
"Status" : "Approved",
"Avg_Minpayment" : "400",
"Avg_APr" : "2.6%",
"offeramount" : "40000"
}
聚合JSON2
{
"Status" : "Rejected",
"Avg_Minpayment" : "600",
"Avg_APr" : "3.6%",
"offeramount" : "70000"
}
但我必须构建并发布最终目标 json 输出主题如下 json 格式。我必须将 header 和 body 添加到聚合 json1 和聚合 json2.
{
"event":{
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
鉴于您的示例 SQL 不会产生示例输出,然后再给出示例输入,因此您想要实现的目标并不是很清楚。事实上,您的示例 SQL 会因未知列错误而失败。
类似于以下内容的内容 会 生成您的示例输出:
CREATE TABLE EVENT_TABLE AS
SELECT
status,
avg(eventDetails->minpayment) as Avg_MinPayment,
avg(eventDetails->apr) AS Avg_APr,
avg(eventDetails->offeramount) AS Avgofferamount
FROM STREAM1
GROUP BY status
EMIT CHANGES;
接下来,您的示例输出...
Status | Avg_MinPayment | Avg_APr | Avgofferamount
-----------------------------------------
Approved | 400 | 2.6% | 40000
Rejected | 600 | 3.6% | 70000
...每个状态输出一行。然而,你说你想要实现的输出......
{
"event":{
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"Key":[
{"Status":"approved","Avg_Minpayment":"400","Avg_APr":"2.6%","offeramount":"40000"},
{"Status":"rejected","Avg_Minpayment":"600","Avg_APr":"3.6%","offeramount":"70000"}
]
}
}
...包含两种状态,即将您的两个示例输入消息组合成一个输出。
如果我没理解错,你确实想输出上面的内容JSON,那么:
您首先需要包含 event
信息。但是哪个事件信息呢?如果您知道它们总是相同的,那么您可以使用:
CREATE TABLE EVENT_TABLE AS
SELECT
status,
latest_by_offset(event) as event,
avg(eventDetails->minpayment) as Avg_MinPayment,
avg(eventDetails->apr) AS Avg_APr,
avg(eventDetails->offeramount) AS Avgofferamount
FROM STREAM1
GROUP BY status
EMIT CHANGES;
latest_by_offset
聚合函数将从它看到的最后一条消息中捕获 event
信息。虽然我不相信这是你想要的。您不能收到其他 rejected
和 accepted
消息,其中包含 不同的 event
信息吗?如果是 event
信息标识哪些消息应该组合在一起,那么像这样的东西可能会给你一些接近你想要的东西:
CREATE TABLE EVENT_TABLE AS
SELECT
event,
collect_list(eventDetails) as body
FROM STREAM1
GROUP BY event
EMIT CHANGES;
如果这很接近,那么您可能需要使用 STRUCT
构造函数和 AS_VALUE
函数来重构您的输出。例如:
CREATE TABLE EVENT_TABLE AS
SELECT
event as key,
AS_VALUE(event) as event,
STRUCT(
keys := collect_list(eventDetails)
) as body
FROM STREAM1
GROUP BY event
EMIT CHANGES;