我应该如何定义 Flink 的 Schema 以从 Pulsar 读取 Protocol Buffer 数据
How should I define Flink's Schema to read Protocol Buffer data from Pulsar
我正在使用 Pulsar-Flink 从 Flink 中的 Pulsar 读取数据。当数据格式为 Protocol Buffer 时,我遇到了困难。
在GitHub首页,Pulsar-Flink正在使用SimpleStringSchema
。但是,貌似它并没有正式遵守 Protocol Buffer。有谁知道如何处理数据格式?我应该如何定义模式?
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "test-source-topic")
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, new SimpleStringSchema(), props);
DataStream<String> stream = see.addSource(source);
// chain operations on dataStream of String and sink the output
// end method chaining
see.execute();
仅供参考,我正在编写 Scala 代码,因此如果您的解释是针对 Scala(而不是 Java),那将非常有帮助。当然,欢迎任何建议!包括 Java.
您应该实现自己的 DeserializationSchema
。假设您有一个 protobuf 消息地址并生成了相应的 Java class。然后架构应如下所示:
public class ProtoDeserializer implements DeserializationSchema<Address> {
@Override
public TypeInformation<Address> getProducedType() {
return TypeInformation.of(Address.class);
}
@Override
public Address deserialize(byte[] message) throws IOException {
return Address.parseFrom(message);
}
@Override
public boolean isEndOfStream(Address nextElement) {
return false;
}
}
我正在使用 Pulsar-Flink 从 Flink 中的 Pulsar 读取数据。当数据格式为 Protocol Buffer 时,我遇到了困难。
在GitHub首页,Pulsar-Flink正在使用SimpleStringSchema
。但是,貌似它并没有正式遵守 Protocol Buffer。有谁知道如何处理数据格式?我应该如何定义模式?
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "test-source-topic")
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, new SimpleStringSchema(), props);
DataStream<String> stream = see.addSource(source);
// chain operations on dataStream of String and sink the output
// end method chaining
see.execute();
仅供参考,我正在编写 Scala 代码,因此如果您的解释是针对 Scala(而不是 Java),那将非常有帮助。当然,欢迎任何建议!包括 Java.
您应该实现自己的 DeserializationSchema
。假设您有一个 protobuf 消息地址并生成了相应的 Java class。然后架构应如下所示:
public class ProtoDeserializer implements DeserializationSchema<Address> {
@Override
public TypeInformation<Address> getProducedType() {
return TypeInformation.of(Address.class);
}
@Override
public Address deserialize(byte[] message) throws IOException {
return Address.parseFrom(message);
}
@Override
public boolean isEndOfStream(Address nextElement) {
return false;
}
}