在使用 Kafka 和 Apache Beam 的上下文中,编码器和 Kafka 反序列化器有什么区别?

In the context of using Kafka and Apache Beam, what is the difference between a Coder and Kafka Deserializer?

我是 Apache Beam 的新手。我正在尝试根据文档使用 KafKaIO 从 Kafka 读取数据。在创建 PCollection 期间,withValueDeserializerAndCoder 方法允许您设置编码器和反序列化器。我不明白为什么我们可能需要解串器和编码器。在我看来,两者都是关于将字节流表示为 java 对象。那么为什么我们需要两者呢?是不是因为 Beam 更像是一个允许多个 runner 下面的框架?

是的,这有点棘手,乍一看并不明显。您需要有一个 Kafka Deserializer(或者 Serializer,如果您写入 Kafka)来将键和值字节解释为您从 Kafka 读取的 Java 个对象。同时,Beam 要求我们提供 Coders 来在运行时物化我们 PCollections 的中间数据。

编码器与(反)序列化器(负责解释 Kafka 消息)无关,因此我们需要明确提供编码器。虽然,KafkaIO 会尝试从反序列化器中推断出一个编码器,并且在许多情况下它会隐式工作,但是如果它失败或者你想提供一个特定的编码器,那么你可以单独指定它。

例如,如果您的 Kafka 消息以 Avro 格式序列化,您可以使用 KafkaAvroDeserializer 和内部 Beam AvroCoder.

public static void main(String[] args) {
...
  KafkaIO.Read read = KafkaIO.<Long, MyClass>read()
      .withKeyDeserializer(LongDeserializer.class)
      .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
          AvroCoder.of(MyClass.class));
...
}

@DefaultCoder(AvroCoder.class) 
public class MyClass {

  String name;
  String age;

  MyClass() {}

  MyClass(String n, String a) {
    this.name = n;
    this.age = a;
  }
}