kafka connector是否只支持append only stream
Does kafka connector only support append only stream
我有以下简单的代码片段,想将流 group by
结果写入 kafka 主题。
Kafka sink table定义:
CREATE TABLE sinkTable (
id STRING,
total_price DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'test6',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.fields' = 'id',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'ALL'
)
当我运行下面的查询
insert into sinkTable
select id, sum(price)
from sourceTable
group by id
抛出异常,异常为:
Table sink 'default_catalog.default_database.sinkTable' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[id], select=[id, SUM(price) AS EXPR])
不知道问题出在哪里。我看起来 connector:kafka
不支持 group by
查询?
问题正如您所描述的那样,默认的 kafka
连接器仅支持附加流。正如您想象的那样,您尝试 运行 的查询将为每个新元素生成更新,因为它会更改具有此 ID 的元素的总和。
最简单的方法之一是使用 upsert-kafka
connector,它会自动处理更新并将它们写入 kafka,但是这个只有 Flink 1.12 之后才可用,所以你可能想更新自己到这个版本,如果你还没有。
我有以下简单的代码片段,想将流 group by
结果写入 kafka 主题。
Kafka sink table定义:
CREATE TABLE sinkTable (
id STRING,
total_price DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'test6',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.fields' = 'id',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'ALL'
)
当我运行下面的查询
insert into sinkTable
select id, sum(price)
from sourceTable
group by id
抛出异常,异常为:
Table sink 'default_catalog.default_database.sinkTable' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[id], select=[id, SUM(price) AS EXPR])
不知道问题出在哪里。我看起来 connector:kafka
不支持 group by
查询?
问题正如您所描述的那样,默认的 kafka
连接器仅支持附加流。正如您想象的那样,您尝试 运行 的查询将为每个新元素生成更新,因为它会更改具有此 ID 的元素的总和。
最简单的方法之一是使用 upsert-kafka
connector,它会自动处理更新并将它们写入 kafka,但是这个只有 Flink 1.12 之后才可用,所以你可能想更新自己到这个版本,如果你还没有。