KSQL TABLE 或 STREAM 中的字符串字段包含部分原始 JSON 消息
String field in KSQL TABLE or STREAM containing part of original JSON message
是否可以将字符串字段添加到 KSQL table/stream,它将包含原始消息的一部分 JSON。
例如,
原始消息:
{userId:12345,
service:"service-1",
"debug":{
"msg":"Debug message",
"timer": 11.12}
}
因此,我们需要将 userId
映射到 userId BIGINT
,将 service
映射到 service STRING
,将 debug
映射到 debug STRING
,其中将包含 {"msg":"Debug message", "timer": 11.12}
作为字符串。
是的,您可以简单地将其声明为 VARCHAR
。从那里您可以将它视为恰好是 JSON 的字符串,或者您可以使用 EXTRACTJSONFIELD
函数进一步操作它。
向主题发送示例消息:
echo '{"userId":12345, "service":"service-1", "debug":{ "msg":"Debug message", "timer": 11.12} }' | kafkacat -b localhost:9092 -t test_topic -P
声明流:
ksql> CREATE STREAM demo (userid BIGINT, service VARCHAR, debug VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
查询列:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT USERID, SERVICE, DEBUG FROM demo;
12345 | service-1 | {"msg":"Debug message","timer":11.12}
访问嵌套的 JSON 字段:
ksql> SELECT USERID, SERVICE, EXTRACTJSONFIELD(DEBUG,'$.msg') FROM demo;
12345 | service-1 | Debug message
ksql> SELECT USERID, SERVICE, EXTRACTJSONFIELD(DEBUG,'$.timer') FROM demo;
12345 | service-1 | 11.12
是否可以将字符串字段添加到 KSQL table/stream,它将包含原始消息的一部分 JSON。
例如,
原始消息:
{userId:12345,
service:"service-1",
"debug":{
"msg":"Debug message",
"timer": 11.12}
}
因此,我们需要将 userId
映射到 userId BIGINT
,将 service
映射到 service STRING
,将 debug
映射到 debug STRING
,其中将包含 {"msg":"Debug message", "timer": 11.12}
作为字符串。
是的,您可以简单地将其声明为 VARCHAR
。从那里您可以将它视为恰好是 JSON 的字符串,或者您可以使用 EXTRACTJSONFIELD
函数进一步操作它。
向主题发送示例消息:
echo '{"userId":12345, "service":"service-1", "debug":{ "msg":"Debug message", "timer": 11.12} }' | kafkacat -b localhost:9092 -t test_topic -P
声明流:
ksql> CREATE STREAM demo (userid BIGINT, service VARCHAR, debug VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
查询列:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT USERID, SERVICE, DEBUG FROM demo;
12345 | service-1 | {"msg":"Debug message","timer":11.12}
访问嵌套的 JSON 字段:
ksql> SELECT USERID, SERVICE, EXTRACTJSONFIELD(DEBUG,'$.msg') FROM demo;
12345 | service-1 | Debug message
ksql> SELECT USERID, SERVICE, EXTRACTJSONFIELD(DEBUG,'$.timer') FROM demo;
12345 | service-1 | 11.12