在 Flink 集群中注册 Java Class
Register Java Class in Flink Cluster
我正在 运行在 Flink Cluster 中安装我的 Fat Jar,它读取 Kafka 并保存在 Cassandra 中,代码是,
final Properties prop = getProperties();
final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>
(kafkaTopicName, new SimpleStringSchema(), prop);
flinkConsumer.setStartFromEarliest();
final DataStream<String> stream = env.addSource(flinkConsumer);
DataStream<Person> sensorStreaming = stream.flatMap(new FlatMapFunction<String, Person>() {
@Override
public void flatMap(String value, Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
logger.error("Json Processing Exception", e);
}
}
});
savePersonDetails(sensorStreaming);
env.execute();
并且 Person POJO 包含,
@Column(name = "event_time")
private Instant eventTime;
Cassandra 端需要编解码器来存储 Instant
,
final Cluster cluster = ClusterManager.getCluster(cassandraIpAddress);
cluster.getConfiguration().getCodecRegistry().register(InstantCodec.instance);
当我 运行 独立工作正常,但是当我 运行 本地集群抛出如下错误时,
Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [timestamp <-> java.time.Instant]
at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:679)
at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:526)
at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
at com.datastax.driver.core.CodecRegistry.access0(CodecRegistry.java:140)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:211)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:208)
我阅读了下面的注册文件,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/custom_serializers.html
但 InstantCodec
是第 3 方之一。如何注册?
我解决了这个问题,有 LocalDateTime
发出,当我用相同类型转换时,出现上述错误。我将类型更改为 java.util Date
类型然后它起作用了。
我正在 运行在 Flink Cluster 中安装我的 Fat Jar,它读取 Kafka 并保存在 Cassandra 中,代码是,
final Properties prop = getProperties();
final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>
(kafkaTopicName, new SimpleStringSchema(), prop);
flinkConsumer.setStartFromEarliest();
final DataStream<String> stream = env.addSource(flinkConsumer);
DataStream<Person> sensorStreaming = stream.flatMap(new FlatMapFunction<String, Person>() {
@Override
public void flatMap(String value, Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
logger.error("Json Processing Exception", e);
}
}
});
savePersonDetails(sensorStreaming);
env.execute();
并且 Person POJO 包含,
@Column(name = "event_time")
private Instant eventTime;
Cassandra 端需要编解码器来存储 Instant
,
final Cluster cluster = ClusterManager.getCluster(cassandraIpAddress);
cluster.getConfiguration().getCodecRegistry().register(InstantCodec.instance);
当我 运行 独立工作正常,但是当我 运行 本地集群抛出如下错误时,
Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [timestamp <-> java.time.Instant]
at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:679)
at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:526)
at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
at com.datastax.driver.core.CodecRegistry.access0(CodecRegistry.java:140)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:211)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:208)
我阅读了下面的注册文件,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/custom_serializers.html
但 InstantCodec
是第 3 方之一。如何注册?
我解决了这个问题,有 LocalDateTime
发出,当我用相同类型转换时,出现上述错误。我将类型更改为 java.util Date
类型然后它起作用了。