我应该如何定义 Flink 的 Schema 以从 Pulsar 读取 Protocol Buffer 数据

How should I define Flink's Schema to read Protocol Buffer data from Pulsar

我正在使用 Pulsar-Flink 从 Flink 中的 Pulsar 读取数据。当数据格式为 Protocol Buffer 时,我遇到了困难。

在GitHub首页,Pulsar-Flink正在使用SimpleStringSchema。但是,貌似它并没有正式遵守 Protocol Buffer。有谁知道如何处理数据格式?我应该如何定义模式?

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "test-source-topic")
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, new SimpleStringSchema(), props);

DataStream<String> stream = see.addSource(source);

// chain operations on dataStream of String and sink the output
// end method chaining

see.execute();

仅供参考,我正在编写 Scala 代码,因此如果您的解释是针对 Scala(而不是 Java),那将非常有帮助。当然,欢迎任何建议!包括 Java.

您应该实现自己的 DeserializationSchema。假设您有一个 protobuf 消息地址并生成了相应的 Java class。然后架构应如下所示:

public class ProtoDeserializer implements DeserializationSchema<Address> {
    @Override
    public TypeInformation<Address> getProducedType() {
        return TypeInformation.of(Address.class);
    }

    @Override
    public Address deserialize(byte[] message) throws IOException {
        return Address.parseFrom(message);
    }

    @Override
    public boolean isEndOfStream(Address nextElement) {
        return false;
    }
}