如何将参数传递给 apache beam (KafkaIO) 中的 avro 解串器?
How to pass parameters to an avro deserializer in apache beam (KafkaIO)?
我需要在运行时下载 AVRO 模式,我需要传递 bootstrap 服务器和 kafka 主题来解析正确的模式,但我找不到在反序列化器上传递这些参数的方法(除了对它们进行硬编码)。你知道怎么做吗?
val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
ops.setKafkaTopic(pars.kafkaTopic)
ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
val p = Pipeline.create(ops)
p.apply( KafkaIO.read<String, Measurement>()
.withTopic(pars.kafkaTopic)
.withBootstrapServers(pars.kafkaBootstrapServers)
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializer(RemoteAvroDeserializer::class.java)
.withoutMetadata()
)
.apply(Values.create())
(TransformToMeasurementFN()))
.apply(
Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
.apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
.apply(Count.perElement())
.apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))
p.run()
这是我的解串器:
class RemoteAvroDeserializer : Deserializer<Measurement> {
val decoder: BinaryMessageDecoder<Measurement>
public constructor() {
val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
decoder = Measurement.createDecoder(schemaStore)
}
override fun deserialize(s: String, bytes: ByteArray): Measurement {
return decoder.decode(bytes)
}
override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
}
override fun close() {
}
}
根据 Beam 文档,您可以像这样设置消费者配置
KafkaIO...
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
我假设您可以在此处添加 schema.registry.url
或其他内容
我需要在运行时下载 AVRO 模式,我需要传递 bootstrap 服务器和 kafka 主题来解析正确的模式,但我找不到在反序列化器上传递这些参数的方法(除了对它们进行硬编码)。你知道怎么做吗?
val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
ops.setKafkaTopic(pars.kafkaTopic)
ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
val p = Pipeline.create(ops)
p.apply( KafkaIO.read<String, Measurement>()
.withTopic(pars.kafkaTopic)
.withBootstrapServers(pars.kafkaBootstrapServers)
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializer(RemoteAvroDeserializer::class.java)
.withoutMetadata()
)
.apply(Values.create())
(TransformToMeasurementFN()))
.apply(
Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
.apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
.apply(Count.perElement())
.apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))
p.run()
这是我的解串器:
class RemoteAvroDeserializer : Deserializer<Measurement> {
val decoder: BinaryMessageDecoder<Measurement>
public constructor() {
val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
decoder = Measurement.createDecoder(schemaStore)
}
override fun deserialize(s: String, bytes: ByteArray): Measurement {
return decoder.decode(bytes)
}
override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
}
override fun close() {
}
}
根据 Beam 文档,您可以像这样设置消费者配置
KafkaIO...
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
我假设您可以在此处添加 schema.registry.url
或其他内容