在使用 Kafka 和 Apache Beam 的上下文中,编码器和 Kafka 反序列化器有什么区别?
In the context of using Kafka and Apache Beam, what is the difference between a Coder and Kafka Deserializer?
我是 Apache Beam 的新手。我正在尝试根据文档使用 KafKaIO
从 Kafka 读取数据。在创建 PCollection
期间,withValueDeserializerAndCoder
方法允许您设置编码器和反序列化器。我不明白为什么我们可能需要解串器和编码器。在我看来,两者都是关于将字节流表示为 java 对象。那么为什么我们需要两者呢?是不是因为 Beam 更像是一个允许多个 runner 下面的框架?
是的,这有点棘手,乍一看并不明显。您需要有一个 Kafka Deserializer
(或者 Serializer
,如果您写入 Kafka)来将键和值字节解释为您从 Kafka 读取的 Java 个对象。同时,Beam 要求我们提供 Coder
s 来在运行时物化我们 PCollection
s 的中间数据。
编码器与(反)序列化器(负责解释 Kafka 消息)无关,因此我们需要明确提供编码器。虽然,KafkaIO
会尝试从反序列化器中推断出一个编码器,并且在许多情况下它会隐式工作,但是如果它失败或者你想提供一个特定的编码器,那么你可以单独指定它。
例如,如果您的 Kafka 消息以 Avro 格式序列化,您可以使用 KafkaAvroDeserializer
和内部 Beam AvroCoder
.
public static void main(String[] args) {
...
KafkaIO.Read read = KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(MyClass.class));
...
}
@DefaultCoder(AvroCoder.class)
public class MyClass {
String name;
String age;
MyClass() {}
MyClass(String n, String a) {
this.name = n;
this.age = a;
}
}
我是 Apache Beam 的新手。我正在尝试根据文档使用 KafKaIO
从 Kafka 读取数据。在创建 PCollection
期间,withValueDeserializerAndCoder
方法允许您设置编码器和反序列化器。我不明白为什么我们可能需要解串器和编码器。在我看来,两者都是关于将字节流表示为 java 对象。那么为什么我们需要两者呢?是不是因为 Beam 更像是一个允许多个 runner 下面的框架?
是的,这有点棘手,乍一看并不明显。您需要有一个 Kafka Deserializer
(或者 Serializer
,如果您写入 Kafka)来将键和值字节解释为您从 Kafka 读取的 Java 个对象。同时,Beam 要求我们提供 Coder
s 来在运行时物化我们 PCollection
s 的中间数据。
编码器与(反)序列化器(负责解释 Kafka 消息)无关,因此我们需要明确提供编码器。虽然,KafkaIO
会尝试从反序列化器中推断出一个编码器,并且在许多情况下它会隐式工作,但是如果它失败或者你想提供一个特定的编码器,那么你可以单独指定它。
例如,如果您的 Kafka 消息以 Avro 格式序列化,您可以使用 KafkaAvroDeserializer
和内部 Beam AvroCoder
.
public static void main(String[] args) {
...
KafkaIO.Read read = KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(MyClass.class));
...
}
@DefaultCoder(AvroCoder.class)
public class MyClass {
String name;
String age;
MyClass() {}
MyClass(String n, String a) {
this.name = n;
this.age = a;
}
}