Apache Flink:如何使用 Java 映射流(或包含 DTO 的映射流)?
Apache Flink: How do I use a stream of Java Map (or Map containing DTOs)?
我正在使用 Flink,有一个 JSON 字符串流到达我的系统,其中包含动态变化的字段和嵌套字段。所以我不能模拟并将这个传入的 JSON 转换为静态 POJO,我必须依赖 Map 来代替。
我的第一个转换是使用 GSON 解析将 JSON 字符串流转换为 Map 对象流,然后将地图包装在名为 Data 的 DTO 中。
(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);
Data data = new Data(map); // Data has getters, setters for the map and implements Serializable
在这个转换处理之后,问题出现了,我试图将结果流输入我的自定义 Flink 接收器。 invoke 函数不会在接收器中被调用。但是,如果我从这个包含 DTO 的 Map 更改为原始或没有 Map 的常规 DTO,则接收器可以工作。
我的 DTO 如下所示:
public class FakeDTO {
private String id;
private LinkedTreeMap map; // com.google.gson.internal
// getters and setters
// constructors, empty and with fields
我尝试了以下两种解决方案:
env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class;
env.getConfig().disableGenericTypes();
有专家建议我可以在这种情况下使用吗?
我能够解决这个问题。在我的 Flink 日志中,我看到没有找到一个名为 ReflectionSerializerFactory class 的 Kryo 文件。我在 Maven 中更新了 Kryo 版本,并为我的地图使用了 Flink 文档说 Flink 支持的地图类型。
只需确保在您的代码中指定了泛型类型,并在地图的 POJO 中添加 getter 和 setter。
我还使用 .returns(xyz.class) 类型减速来避免类型擦除的影响。
我正在使用 Flink,有一个 JSON 字符串流到达我的系统,其中包含动态变化的字段和嵌套字段。所以我不能模拟并将这个传入的 JSON 转换为静态 POJO,我必须依赖 Map 来代替。
我的第一个转换是使用 GSON 解析将 JSON 字符串流转换为 Map 对象流,然后将地图包装在名为 Data 的 DTO 中。
(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);
Data data = new Data(map); // Data has getters, setters for the map and implements Serializable
在这个转换处理之后,问题出现了,我试图将结果流输入我的自定义 Flink 接收器。 invoke 函数不会在接收器中被调用。但是,如果我从这个包含 DTO 的 Map 更改为原始或没有 Map 的常规 DTO,则接收器可以工作。
我的 DTO 如下所示:
public class FakeDTO {
private String id;
private LinkedTreeMap map; // com.google.gson.internal
// getters and setters
// constructors, empty and with fields
我尝试了以下两种解决方案:
env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class;
env.getConfig().disableGenericTypes();
有专家建议我可以在这种情况下使用吗?
我能够解决这个问题。在我的 Flink 日志中,我看到没有找到一个名为 ReflectionSerializerFactory class 的 Kryo 文件。我在 Maven 中更新了 Kryo 版本,并为我的地图使用了 Flink 文档说 Flink 支持的地图类型。
只需确保在您的代码中指定了泛型类型,并在地图的 POJO 中添加 getter 和 setter。
我还使用 .returns(xyz.class) 类型减速来避免类型擦除的影响。