Apache Flink 从 Kafka 读取 Avro byte[]
Apache Flink read Avro byte[] from Kafka
在查看示例时,我看到了很多这样的内容:
FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
我看到他们这里已经知道架构了。
I do not know the schema until I read the byte[] into a Generic Record
then get the schema. (As it may change from record to record)
有人可以指点我从 byte[]
读取到映射过滤器的 FlinkKafkaConsumer08
以便我可以删除一些前导位,然后将 byte[]
加载到通用记录中吗?
我正在做类似的事情(我使用的是 09 消费者)
在您的主代码中传递您的自定义反序列化器:
FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
parameterTool.getProperties());
自定义反序列化架构读取字节,找出架构 and/or 从架构注册表中检索它,反序列化为 GenericRecord 和 returns GenericRecord 对象。
public class MyDeserializationSchema<T> implements DeserializationSchema<T> {
private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;
@Override
public T deserialize(byte[] arg0) throws IOException {
//do your stuff here, strip off your bytes
//deserialize and create your GenericRecord
return (T) (myavroevent);
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avrotype);
}
}
如果您使用 Confluent 的模式注册表,我认为首选解决方案是使用 Confluent 提供的 Avro serde。这样,我们只需调用 deserialize()
即可在后台自动完成要使用的最新版本 Avro 模式的解析,无需字节操作。
它归结为这样的事情(scala 中的示例代码,java 解决方案非常相似):
import io.confluent.kafka.serializers.KafkaAvroDeserializer
...
val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
...
override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): KafkaKV = {
val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]
KafkaKV(key, value)
}
...
此方法要求消息生成器也与模式注册表集成并在那里发布模式。这可以通过与上面非常相似的方式完成,使用 Confluent 的 KafkaAvroSerializer
我在这里贴出了详细的解释:How to integrate Flink with Confluent's schema registry
在查看示例时,我看到了很多这样的内容:
FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
我看到他们这里已经知道架构了。
I do not know the schema until I read the byte[] into a Generic Record then get the schema. (As it may change from record to record)
有人可以指点我从 byte[]
读取到映射过滤器的 FlinkKafkaConsumer08
以便我可以删除一些前导位,然后将 byte[]
加载到通用记录中吗?
我正在做类似的事情(我使用的是 09 消费者)
在您的主代码中传递您的自定义反序列化器:
FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
parameterTool.getProperties());
自定义反序列化架构读取字节,找出架构 and/or 从架构注册表中检索它,反序列化为 GenericRecord 和 returns GenericRecord 对象。
public class MyDeserializationSchema<T> implements DeserializationSchema<T> {
private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;
@Override
public T deserialize(byte[] arg0) throws IOException {
//do your stuff here, strip off your bytes
//deserialize and create your GenericRecord
return (T) (myavroevent);
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avrotype);
}
}
如果您使用 Confluent 的模式注册表,我认为首选解决方案是使用 Confluent 提供的 Avro serde。这样,我们只需调用 deserialize()
即可在后台自动完成要使用的最新版本 Avro 模式的解析,无需字节操作。
它归结为这样的事情(scala 中的示例代码,java 解决方案非常相似):
import io.confluent.kafka.serializers.KafkaAvroDeserializer
...
val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
...
override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): KafkaKV = {
val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]
KafkaKV(key, value)
}
...
此方法要求消息生成器也与模式注册表集成并在那里发布模式。这可以通过与上面非常相似的方式完成,使用 Confluent 的 KafkaAvroSerializer
我在这里贴出了详细的解释:How to integrate Flink with Confluent's schema registry