Apache Flink - 如何实现自定义反序列化器实现 DeserializationSchema
Apache Flink - How to implement custom Deserializer implementing DeserializationSchema
我正在使用 Flink,并且正在使用 Kafka 连接器。我从 flink 收到的消息是逗号分隔项的列表。 “'a','b','c',1,0.1 ....'12:01:00.000'”
其中一个包含事件时间,我想将此事件时间用于每个分区的水印(在 kafka 源中),然后将此事件时间用于会话窗口。
我的情况与平常有点不同,因为据我了解,人们通常使用“kafka Timestamps”和 SimpleStringSchema()。在我的例子中,我必须编写自己的反序列化器来实现 DeserializationSchema 和 return 元组或 Pojo。所以基本上用我自己的函数替换 SimpleStringSchema() 。 Flink 开箱即用地提供了一些反序列化器,但我真的不明白如何创建自定义反序列化逻辑。
查看 flink 网站我发现了这个:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html
有人给了我一个示例(感谢 David!),但我仍然不知道如何实现我的示例。
我真的需要一个例子来说明我如何为列表做这件事。上面指出的是针对 JSON 的,所以它给了我理论和概念,但我被困在那里了。
你应该像这样介绍POJO
public class Event implements Serializable {
...
private Long timestamp;
}
并实现类似于 link 中的简单反序列化器 - 您可以通过逗号手动拆分消息字符串来解析该行,或者您可以使用 out-of-box csv 阅读器,例如 opencsv,将该行解析为您的 POJO:
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
@Override
public ClickEvent deserialize(byte[] message) throws IOException {
String line = new String(message, StandardCharsets.UTF_8);
String[] parts = line.split(",");
Event event = new Event();
// TODO: parts to event here
return event;
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
我正在使用 Flink,并且正在使用 Kafka 连接器。我从 flink 收到的消息是逗号分隔项的列表。 “'a','b','c',1,0.1 ....'12:01:00.000'” 其中一个包含事件时间,我想将此事件时间用于每个分区的水印(在 kafka 源中),然后将此事件时间用于会话窗口。 我的情况与平常有点不同,因为据我了解,人们通常使用“kafka Timestamps”和 SimpleStringSchema()。在我的例子中,我必须编写自己的反序列化器来实现 DeserializationSchema 和 return 元组或 Pojo。所以基本上用我自己的函数替换 SimpleStringSchema() 。 Flink 开箱即用地提供了一些反序列化器,但我真的不明白如何创建自定义反序列化逻辑。
查看 flink 网站我发现了这个:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html
有人给了我一个示例(感谢 David!),但我仍然不知道如何实现我的示例。
我真的需要一个例子来说明我如何为列表做这件事。上面指出的是针对 JSON 的,所以它给了我理论和概念,但我被困在那里了。
你应该像这样介绍POJO
public class Event implements Serializable {
...
private Long timestamp;
}
并实现类似于 link 中的简单反序列化器 - 您可以通过逗号手动拆分消息字符串来解析该行,或者您可以使用 out-of-box csv 阅读器,例如 opencsv,将该行解析为您的 POJO:
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
@Override
public ClickEvent deserialize(byte[] message) throws IOException {
String line = new String(message, StandardCharsets.UTF_8);
String[] parts = line.split(",");
Event event = new Event();
// TODO: parts to event here
return event;
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}