Apache Flink:如何使用 Java 映射流(或包含 DTO 的映射流)?

Apache Flink: How do I use a stream of Java Map (or Map containing DTOs)?

我正在使用 Flink,有一个 JSON 字符串流到达我的系统,其中包含动态变化的字段和嵌套字段。所以我不能模拟并将这个传入的 JSON 转换为静态 POJO,我必须依赖 Map 来代替。

我的第一个转换是使用 GSON 解析将 JSON 字符串流转换为 Map 对象流,然后将地图包装在名为 Data 的 DTO 中。

(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);

Data data = new Data(map); // Data has getters, setters for the map and implements Serializable

在这个转换处理之后,问题出现了,我试图将结果流输入我的自定义 Flink 接收器。 invoke 函数不会在接收器中被调用。但是,如果我从这个包含 DTO 的 Map 更改为原始或没有 Map 的常规 DTO,则接收器可以工作。

我的 DTO 如下所示:

public class FakeDTO {
    private String id;
    private LinkedTreeMap map; // com.google.gson.internal

    // getters and setters
    // constructors, empty and with fields

我尝试了以下两种解决方案:

env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class; 
env.getConfig().disableGenericTypes();

有专家建议我可以在这种情况下使用吗?

我能够解决这个问题。在我的 Flink 日志中,我看到没有找到一个名为 ReflectionSerializerFactory class 的 Kryo 文件。我在 Maven 中更新了 Kryo 版本,并为我的地图使用了 Flink 文档说 Flink 支持的地图类型。

只需确保在您的代码中指定了泛型类型,并在地图的 POJO 中添加 getter 和 setter。

我还使用 .returns(xyz.class) 类型减速来避免类型擦除的影响。