如何将参数传递给 apache beam (KafkaIO) 中的 avro 解串器?

How to pass parameters to an avro deserializer in apache beam (KafkaIO)?

我需要在运行时下载 AVRO 模式,我需要传递 bootstrap 服务器和 kafka 主题来解析正确的模式,但我找不到在反序列化器上传递这些参数的方法(除了对它们进行硬编码)。你知道怎么做吗?

    val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
    ops.setKafkaTopic(pars.kafkaTopic)
    ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
    ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
    val p = Pipeline.create(ops)

    p.apply( KafkaIO.read<String, Measurement>()
            .withTopic(pars.kafkaTopic)
            .withBootstrapServers(pars.kafkaBootstrapServers)
            .withKeyDeserializer(StringDeserializer::class.java)
            .withValueDeserializer(RemoteAvroDeserializer::class.java)
            .withoutMetadata()
    )
            .apply(Values.create())
            (TransformToMeasurementFN()))
            .apply(
                    Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
            .apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
            .apply(Count.perElement())
            .apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))

    p.run()

这是我的解串器:

class RemoteAvroDeserializer : Deserializer<Measurement> {
 val decoder: BinaryMessageDecoder<Measurement>

 public constructor() {
     val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
     decoder = Measurement.createDecoder(schemaStore)
 }

 override fun deserialize(s: String, bytes: ByteArray): Measurement {
     return decoder.decode(bytes)
 }

 override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
 }

 override fun close() {
 }
}

根据 Beam 文档,您可以像这样设置消费者配置

  KafkaIO... 
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))

我假设您可以在此处添加 schema.registry.url 或其他内容