使用 JsonTreeReader 在 ConsumeKafkaRecord 中设置 schema.name
Setting schema.name in ConsumeKafkaRecord with JsonTreeReader
我正在尝试使用 ConsumeKafkaRecord
处理器从 Kafka 读取数据,其中 JsonTreeReader
作为 reader。
我需要将 schema.name
设置为我的 AvroRegistry
控制器服务器中的架构名称。
但是在从 Kafka 读取数据之前我该怎么做呢?
我还尝试将 schema.name
设置为静态值,甚至尝试 reader 中的 schema.text
属性(策略设置为读取 schema.text
) 到原始模式 json,它仍然出错,抱怨它在流文件中找不到 schema.name
。
我将如何使用 ConsumeKafkaRecord
和 JsonTreeReader
?
通常,如果您有一个带有“用户模式名称”的 reader,则“模式名称”默认为“${schema.name}”,这意味着从传入的属性中获取模式名称流文件,其中属性名为“schema.name”。
由于 ConsumerKafkaRecord(以及所有其他消费 Kafka 处理器)是源处理器,因此没有带有属性的传入流文件,因此您无法引用 ${schema.name}。您必须将 JsonTreeReader 中的“架构名称”设置为实际架构名称,而不是动态值。
我正在尝试使用 ConsumeKafkaRecord
处理器从 Kafka 读取数据,其中 JsonTreeReader
作为 reader。
我需要将 schema.name
设置为我的 AvroRegistry
控制器服务器中的架构名称。
但是在从 Kafka 读取数据之前我该怎么做呢?
我还尝试将 schema.name
设置为静态值,甚至尝试 reader 中的 schema.text
属性(策略设置为读取 schema.text
) 到原始模式 json,它仍然出错,抱怨它在流文件中找不到 schema.name
。
我将如何使用 ConsumeKafkaRecord
和 JsonTreeReader
?
通常,如果您有一个带有“用户模式名称”的 reader,则“模式名称”默认为“${schema.name}”,这意味着从传入的属性中获取模式名称流文件,其中属性名为“schema.name”。
由于 ConsumerKafkaRecord(以及所有其他消费 Kafka 处理器)是源处理器,因此没有带有属性的传入流文件,因此您无法引用 ${schema.name}。您必须将 JsonTreeReader 中的“架构名称”设置为实际架构名称,而不是动态值。