Flink 序列化:POJO 类型与 GenericType

Flink Serialization: POJO type vs. GenericType

在我的 Flink 应用程序中,我使用 java.time.Instant 来表示 UTC 时间戳。该应用程序 运行 很好,但我最近在 Flink 日志中注意到这条消息:

"Class class java.time.Instant cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on \"数据类型和序列化\"了解对性能影响的详细信息。"

当我去阅读文档时,没有太多关于使用像 Instant 这样的东西的性能影响的讨论。我的一般理解是必须使用 Kryo 而不是 Flink 内置的序列化器。我目前使用的是 Flink 1.6,看到 Flink 1.7 及以上版本似乎有一个 InstantSerializer class。这是否意味着如果我升级 Flink 版本,我使用 Instant 的 POJO 将不再需要作为 GenericType 处理?

一般来说,什么是最好的java class来表示时间?有没有办法使用 Instant 并减轻或消除对性能的任何影响?

日志信息有点误导,但您的理解是正确的。 Instant 在 Flink 1.6 中使用 Kryo 序列化。

在 Flink 1.7+ 中,Instant 将被序列化为 InstantSerializer,而不是 KryoSerializer

您的 POJO 是否会被如此对待,并不取决于 Instant 将如何在您的 POJO 中序列化。该消息只是说系统试图查看 Instant 是否是 POJO。

示例:

    public class SpecialMomentWithName {
        private String name;
        public Instant specialMoment;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }

SpecialMomentWithName 在 Flink 中将始终作为 POJO 处理。

在微基准测试中使用 Kryo 与新的 InstanceSerializer 进行序列化 Instant 时,您可能会发现性能略有下降。 您的 Flink 作业的性能是否会从这种变化中受益很难预测:如果 Instant 的序列化成本消耗了您大部分 CPU 的时间(并且您的作业受 CPU 限制),那么我希望性能有所提高。 如果您的网络或硬盘(使用 RocksDB 时)是限制因素,我不希望性能有所提高。

我不会优化 Instance 序列化的性能,除非对您实际失去性能的地方进行了一些分析。如果您发现您的性能受到像这样的序列化时间的影响,您可以尝试将 Instance 表示为 long。这将降低代码的可读性,并且您可能需要额外的 CPU 周期来在类型之间进行转换。