我如何在flink中使用joda.time(或者我如何使用typeutils.runtime.kryo)
How do I use joda.time in flink (or how do I use typeutils.runtime.kryo)
在一个flink项目中我使用了一个案例class点击
case class click( date: LocalDateTime, stbId:String, channelId :Int)
这个 class 填充了数据集并且它工作正常,日期是 java 8 java.time.LocalDateTime
。在 java 7 环境中切换到 org.joda(版本 2.9)后,对数据集中单击对象的调用没有像以前那样执行。访问单击对象的日期字段的某些功能引发 NullPointerExceptions
。这些函数的示例是 getHourOfDay
toString
等。我能够确保点击 class 的日期字段不为空。
我怀疑 joda 时间库不能很好地与 kryo 序列化交互。参见 or
在 Flink API 中有 org.apache.flink.api.java.typeutils.runtime.kryo.Serializers 和静态方法 registerJodaTime
。这似乎是相关的。我简单地尝试了
import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)
这还不够。
我是对的吗?如何使用 java.typeutils.runtime.kryo?
使用的 Flink 版本:0.9.1。斯卡拉:2.10 joda.time 2.9
跟进:
这是建议的确切添加代码(感谢 Fabian 和 Robert)
val env = ExecutionEnvironment.getExecutionEnvironment
//import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)
在嵌入式执行的日志文件中,我可以找到以下相关部分:
16:44:53,998 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
16:44:54,545 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map()
16:44:57,369 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
然而我目睹了以下情况
Exception in thread "main" java.lang.NullPointerException
at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
at java.lang.String.valueOf(Unknown Source)
at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
at myflink.click.toString(Ingestor.scala:20)
...
您应该在 ExecutionEnvironment
的 ExecutionConfig
中注册 joda 序列化程序:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Serializers.registerJodaTime(env.getConfig());
希望对您有所帮助。
Flink 正在对无法序列化的类型使用 Kryo。 LocalDateTime
就是这样一个class.
遗憾的是,Kryo 也无法正确序列化它,因此我们必须通过为此 class.
提供专门的序列化器来告诉 Kryo 如何做到这一点
- 添加
de.javakaffee:kryo-serializers
作为依赖项:
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.30</version>
</dependency>
(请注意,添加此依赖项可能会导致在集群上使用 Flink 时出现问题。请告诉我)
- 使用
ExecutionEnvironment
: 注册新的序列化程序
val env = ExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer])
希望对您有所帮助(我保留旧答案作为参考)
调试 Flink 中 Kryo/Serializer 问题的一些一般说明:
在本地执行作业时(也应该在 ./bin/flink
前端工作,但输出可能在 log/ 目录中),您应该看到如下内容:
14:05:52,863 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 15 registered types and 2 default Kryo serializers
14:05:52,943 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting FlinkMiniCluster.
14:05:53,150 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
注册类型和 Kryo 序列化程序的数量大于 0。
使用 DEBUG 日志级别(在 log4j.properties
中将 INFO
替换为 DEBUG
)您实际上可以获得有关已注册序列化程序的更多详细信息:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
在一个flink项目中我使用了一个案例class点击
case class click( date: LocalDateTime, stbId:String, channelId :Int)
这个 class 填充了数据集并且它工作正常,日期是 java 8 java.time.LocalDateTime
。在 java 7 环境中切换到 org.joda(版本 2.9)后,对数据集中单击对象的调用没有像以前那样执行。访问单击对象的日期字段的某些功能引发 NullPointerExceptions
。这些函数的示例是 getHourOfDay
toString
等。我能够确保点击 class 的日期字段不为空。
我怀疑 joda 时间库不能很好地与 kryo 序列化交互。参见 registerJodaTime
。这似乎是相关的。我简单地尝试了
import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)
这还不够。 我是对的吗?如何使用 java.typeutils.runtime.kryo?
使用的 Flink 版本:0.9.1。斯卡拉:2.10 joda.time 2.9
跟进: 这是建议的确切添加代码(感谢 Fabian 和 Robert)
val env = ExecutionEnvironment.getExecutionEnvironment
//import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)
在嵌入式执行的日志文件中,我可以找到以下相关部分:
16:44:53,998 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
16:44:54,545 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map()
16:44:57,369 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
然而我目睹了以下情况
Exception in thread "main" java.lang.NullPointerException
at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
at java.lang.String.valueOf(Unknown Source)
at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
at myflink.click.toString(Ingestor.scala:20)
...
您应该在 ExecutionEnvironment
的 ExecutionConfig
中注册 joda 序列化程序:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Serializers.registerJodaTime(env.getConfig());
希望对您有所帮助。
Flink 正在对无法序列化的类型使用 Kryo。 LocalDateTime
就是这样一个class.
遗憾的是,Kryo 也无法正确序列化它,因此我们必须通过为此 class.
提供专门的序列化器来告诉 Kryo 如何做到这一点- 添加
de.javakaffee:kryo-serializers
作为依赖项:
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.30</version>
</dependency>
(请注意,添加此依赖项可能会导致在集群上使用 Flink 时出现问题。请告诉我)
- 使用
ExecutionEnvironment
: 注册新的序列化程序
val env = ExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer])
希望对您有所帮助(我保留旧答案作为参考)
调试 Flink 中 Kryo/Serializer 问题的一些一般说明:
在本地执行作业时(也应该在 ./bin/flink
前端工作,但输出可能在 log/ 目录中),您应该看到如下内容:
14:05:52,863 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 15 registered types and 2 default Kryo serializers
14:05:52,943 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting FlinkMiniCluster.
14:05:53,150 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
注册类型和 Kryo 序列化程序的数量大于 0。
使用 DEBUG 日志级别(在 log4j.properties
中将 INFO
替换为 DEBUG
)您实际上可以获得有关已注册序列化程序的更多详细信息:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types: