Consumer_failed_message 在 kafka 流中:记录未从主题推送

Consumer_failed_message in kafka stream: Records not pushed from topic

我有一个流程,从 IBM 大型机 IIDR,我将记录发送到 Kafka 主题。 Kafka topic 消息的 value_format 是 AVRO,key 也是 AVRO 格式。记录被推送到 Kafka 主题中。我有一个与该主题相关的流。但是记录没有传递到流中。 test_iidr 主题示例 -

rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

流中的value_format是AVRO,列名都检查过了。

流创建查询 -

CREATE STREAM test_iidr (
  col1 STRING, 
  col2 DECIMAL(2,0),
  col3 DECIMAL(1,0),
  iidr_tran_type STRING,
  iidr_a_ccid STRING,
  iidr_a_user STRING,
  iidr_src_upd_ts STRING,
  iidr_a_member STRING) 
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');

是否由于 WITH 语句中未提及 KEY 而无法从主题加载到流中? 模式注册表中注册了 test_iidr-valuetest_iidr-key 主题。

Kafka-connectdocker中的key.convertervalue.converter设置为-org.apache.kafka.connect.json.JsonConverter。这是 JsonConverter 造成的吗?

我用不同的流创建了一个完全不同的管道,并使用 insert into 语句手动插入相同的数据。有效。只有 IIDR 流不工作,记录没有从主题推入流。

我正在使用 Confluent kafka 版本 5.5.0。

连接配置中的 JsonConverter 很可能会将您的 Avro 数据转换为 JSON。

要确定键和值序列化格式,您可以使用 PRINT 命令(我可以看到您已经 运行)。 PRINT 将在 运行 时输出键和值格式。例如:

ksql> PRINT some_topic FROM BEGINNING LIMIT 1;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

所以首先要检查的是通过 PRINT 输出的键和值的格式,然后相应地更新您的 CREATE 语句。

请注意,ksqlDB 支持 Avro/Json 键,因此您可以 want/need 重新分区数据,请参阅:https://docs.ksqldb.io/en/latest/developer-guide/syntax-reference/#what-to-do-if-your-key-is-not-set-or-is-in-a-different-format

旁注:如果值的模式存储在模式注册表中,那么您不需要在 CREATE 语句中定义列,因为 ksqlDB 将加载架构注册表中的列

旁注:您不需要在现有主题的 WITH 子句中使用 PARTITIONS=1, REPLICAS=3,只有当您希望 ksqlDB 创建主题时你.