Consumer_failed_message 在 kafka 流中:记录未从主题推送
Consumer_failed_message in kafka stream: Records not pushed from topic
我有一个流程,从 IBM 大型机 IIDR,我将记录发送到 Kafka 主题。 Kafka topic 消息的 value_format
是 AVRO,key 也是 AVRO 格式。记录被推送到 Kafka 主题中。我有一个与该主题相关的流。但是记录没有传递到流中。
test_iidr
主题示例 -
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}
流中的value_format是AVRO,列名都检查过了。
流创建查询 -
CREATE STREAM test_iidr (
col1 STRING,
col2 DECIMAL(2,0),
col3 DECIMAL(1,0),
iidr_tran_type STRING,
iidr_a_ccid STRING,
iidr_a_user STRING,
iidr_src_upd_ts STRING,
iidr_a_member STRING)
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');
是否由于 WITH
语句中未提及 KEY
而无法从主题加载到流中?
模式注册表中注册了 test_iidr-value
和 test_iidr-key
主题。
Kafka-connect
docker中的key.converter
和value.converter
设置为-org.apache.kafka.connect.json.JsonConverter
。这是 JsonConverter
造成的吗?
我用不同的流创建了一个完全不同的管道,并使用 insert into
语句手动插入相同的数据。有效。只有 IIDR 流不工作,记录没有从主题推入流。
我正在使用 Confluent kafka 版本 5.5.0。
连接配置中的 JsonConverter
很可能会将您的 Avro 数据转换为 JSON。
要确定键和值序列化格式,您可以使用 PRINT
命令(我可以看到您已经 运行)。 PRINT
将在 运行 时输出键和值格式。例如:
ksql> PRINT some_topic FROM BEGINNING LIMIT 1;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}
所以首先要检查的是通过 PRINT 输出的键和值的格式,然后相应地更新您的 CREATE
语句。
请注意,ksqlDB 还 支持 Avro/Json 键,因此您可以 want/need 重新分区数据,请参阅:https://docs.ksqldb.io/en/latest/developer-guide/syntax-reference/#what-to-do-if-your-key-is-not-set-or-is-in-a-different-format
旁注:如果值的模式存储在模式注册表中,那么您不需要在 CREATE 语句中定义列,因为 ksqlDB 将加载架构注册表中的列
旁注:您不需要在现有主题的 WITH
子句中使用 PARTITIONS=1, REPLICAS=3
,只有当您希望 ksqlDB 创建主题时你.
我有一个流程,从 IBM 大型机 IIDR,我将记录发送到 Kafka 主题。 Kafka topic 消息的 value_format
是 AVRO,key 也是 AVRO 格式。记录被推送到 Kafka 主题中。我有一个与该主题相关的流。但是记录没有传递到流中。
test_iidr
主题示例 -
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}
流中的value_format是AVRO,列名都检查过了。
流创建查询 -
CREATE STREAM test_iidr (
col1 STRING,
col2 DECIMAL(2,0),
col3 DECIMAL(1,0),
iidr_tran_type STRING,
iidr_a_ccid STRING,
iidr_a_user STRING,
iidr_src_upd_ts STRING,
iidr_a_member STRING)
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');
是否由于 WITH
语句中未提及 KEY
而无法从主题加载到流中?
模式注册表中注册了 test_iidr-value
和 test_iidr-key
主题。
Kafka-connect
docker中的key.converter
和value.converter
设置为-org.apache.kafka.connect.json.JsonConverter
。这是 JsonConverter
造成的吗?
我用不同的流创建了一个完全不同的管道,并使用 insert into
语句手动插入相同的数据。有效。只有 IIDR 流不工作,记录没有从主题推入流。
我正在使用 Confluent kafka 版本 5.5.0。
连接配置中的 JsonConverter
很可能会将您的 Avro 数据转换为 JSON。
要确定键和值序列化格式,您可以使用 PRINT
命令(我可以看到您已经 运行)。 PRINT
将在 运行 时输出键和值格式。例如:
ksql> PRINT some_topic FROM BEGINNING LIMIT 1;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}
所以首先要检查的是通过 PRINT 输出的键和值的格式,然后相应地更新您的 CREATE
语句。
请注意,ksqlDB 还 支持 Avro/Json 键,因此您可以 want/need 重新分区数据,请参阅:https://docs.ksqldb.io/en/latest/developer-guide/syntax-reference/#what-to-do-if-your-key-is-not-set-or-is-in-a-different-format
旁注:如果值的模式存储在模式注册表中,那么您不需要在 CREATE 语句中定义列,因为 ksqlDB 将加载架构注册表中的列
旁注:您不需要在现有主题的 WITH
子句中使用 PARTITIONS=1, REPLICAS=3
,只有当您希望 ksqlDB 创建主题时你.