TIMESTAMP 的 KSQL STRUCT 字段
KSQL STRUCT Fields For TIMESTAMP
有什么办法可以做到这一点
JSON数据
"Header": {
"StoreID": 10225,
"BusinessDate": "2019-05-03",
"PeriodBusinessDate": "2019-05-03",
"ProcessMode": "Partial"
}
我尝试了这个但是给了我:
定义的模式中不存在 WITH 子句 HEADER->BUSINESSDATE
中提供的时间戳列名称的列。
CREATE STREAM test2 (HEADER STRUCT<StoreID int,BusinessDate VARCHAR>) WITH (KAFKA_TOPIC='hermes__output__tfrema__v1',VALUE_FORMAT='JSON',
timestamp='HEADER->BusinessDate',timestamp_format='yyyy-MMM-dd');
您不能在 TIMESTAMP
参数中使用嵌套字段。您需要先提取它,然后再使用它。例如:
CREATE STREAM X (COL1 INT, COL2 VARCHAR, HEADER STRUCT<StoreID int,BusinessDate VARCHAR>)
WITH (KAFKA_TOPIC='hermes__output__tfrema__v1',VALUE_FORMAT='JSON')
CREATE STREAM Y AS
SELECT COL1, COL2, HEADER->BusinessDate AS BusinessDate, HEADER
FROM X;
CREATE STREAM Z COL1 INT, COL2 VARCHAR, BusinessDate VARCHAR, HEADER STRUCT<StoreID int,BusinessDate VARCHAR>)
WITH (KAFKA_TOPIC='Y',VALUE_FORMAT='JSON',timestamp='BusinessDate',timestamp_format='yyyy-MMM-dd');)
如果您使用的是 Avro,则可以简化事情,因为架构不需要重述:
CREATE STREAM X (COL1 INT, COL2 VARCHAR, HEADER STRUCT<StoreID int,BusinessDate VARCHAR>)
WITH (KAFKA_TOPIC='hermes__output__tfrema__v1',VALUE_FORMAT='JSON')
CREATE STREAM Y WITH (VALUE_FORMAT='AVRO')
AS SELECT COL1, COL2, HEADER->BusinessDate, HEADER FROM X;
CREATE STREAM Z
WITH (KAFKA_TOPIC='Y',VALUE_FORMAT='JSON',timestamp='BusinessDate',timestamp_format='yyyy-MMM-dd');)
有什么办法可以做到这一点
JSON数据
"Header": {
"StoreID": 10225,
"BusinessDate": "2019-05-03",
"PeriodBusinessDate": "2019-05-03",
"ProcessMode": "Partial"
}
我尝试了这个但是给了我:
定义的模式中不存在 WITH 子句 HEADER->BUSINESSDATE
中提供的时间戳列名称的列。
CREATE STREAM test2 (HEADER STRUCT<StoreID int,BusinessDate VARCHAR>) WITH (KAFKA_TOPIC='hermes__output__tfrema__v1',VALUE_FORMAT='JSON',
timestamp='HEADER->BusinessDate',timestamp_format='yyyy-MMM-dd');
您不能在 TIMESTAMP
参数中使用嵌套字段。您需要先提取它,然后再使用它。例如:
CREATE STREAM X (COL1 INT, COL2 VARCHAR, HEADER STRUCT<StoreID int,BusinessDate VARCHAR>)
WITH (KAFKA_TOPIC='hermes__output__tfrema__v1',VALUE_FORMAT='JSON')
CREATE STREAM Y AS
SELECT COL1, COL2, HEADER->BusinessDate AS BusinessDate, HEADER
FROM X;
CREATE STREAM Z COL1 INT, COL2 VARCHAR, BusinessDate VARCHAR, HEADER STRUCT<StoreID int,BusinessDate VARCHAR>)
WITH (KAFKA_TOPIC='Y',VALUE_FORMAT='JSON',timestamp='BusinessDate',timestamp_format='yyyy-MMM-dd');)
如果您使用的是 Avro,则可以简化事情,因为架构不需要重述:
CREATE STREAM X (COL1 INT, COL2 VARCHAR, HEADER STRUCT<StoreID int,BusinessDate VARCHAR>)
WITH (KAFKA_TOPIC='hermes__output__tfrema__v1',VALUE_FORMAT='JSON')
CREATE STREAM Y WITH (VALUE_FORMAT='AVRO')
AS SELECT COL1, COL2, HEADER->BusinessDate, HEADER FROM X;
CREATE STREAM Z
WITH (KAFKA_TOPIC='Y',VALUE_FORMAT='JSON',timestamp='BusinessDate',timestamp_format='yyyy-MMM-dd');)