NPE 当 flink group case class 个对象时
NPE when flink group case class objects
我用的是dataSetAPI,我有两种caseclasses
case class Geo(country:Int, province:Int, city:Int, county:Int)
case class AntiFraudLog(
eventType: Int,
valid: Boolean
)
case class AntiFraudSession(fraudLogs: Seq[AntiFraudLog])
然后我生成了一个 key/value 对,它的值是 class.
val dataKeyValue: DataSet[(Long, AntiFraudLog)]
并尝试使用相同的键对项目进行分组
val groupedSortedData = dataKeyValue groupBy 0
然后将分组数据转换为另一种情况class
val sessionData:DataSet[AntiFraudSession] = groupedSortedData reduceGroup(
logs => AntiFraudSession(logs.map(_._2).toSeq)
)
但是我在运行程序的时候遇到了这样的异常
Caused by: java.lang.NullPointerException
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:32)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.api.scala.DataSet$$anon$$anonfun$flatMap.apply(DataSet.scala:417)
at org.apache.flink.api.scala.DataSet$$anon$$anonfun$flatMap.apply(DataSet.scala:417)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.flink.api.scala.DataSet$$anon.flatMap(DataSet.scala:417)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:163)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
有人知道怎么解决吗?
看起来 Flink 确实无法序列化具有空值集合字段的案例 class。在您的场景中,这将是带有 fraudLogs=null 的 AntiFraudSession。您认为是否有更多转换逻辑可能导致类似元素出现在 sessionData 中?
当你使用 Scala 在 Flink 中反序列化它时,请确保你在 case-class
中没有 null object/value。
为了避免 java.lang.NullPointerException
在 class fields/objects 的情况下使用 Option
根据你的例子:
如果任何字段为空
case class AntiFraudLog(
eventType: Option[Int],
valid: Boolean
)
如果 case class 对象为 null
case class AntiFraudSession(fraudLogs: Option[Seq[AntiFraudLog]])
注意:在 Scala 中使用 null 并不是一个好的 practice/standard。因此,请尝试使用 Scala 中提供的许多其他选项来避免这种情况。
有关详细信息,请单击 here。
我用的是dataSetAPI,我有两种caseclasses
case class Geo(country:Int, province:Int, city:Int, county:Int)
case class AntiFraudLog(
eventType: Int,
valid: Boolean
)
case class AntiFraudSession(fraudLogs: Seq[AntiFraudLog])
然后我生成了一个 key/value 对,它的值是 class.
val dataKeyValue: DataSet[(Long, AntiFraudLog)]
并尝试使用相同的键对项目进行分组
val groupedSortedData = dataKeyValue groupBy 0
然后将分组数据转换为另一种情况class
val sessionData:DataSet[AntiFraudSession] = groupedSortedData reduceGroup(
logs => AntiFraudSession(logs.map(_._2).toSeq)
)
但是我在运行程序的时候遇到了这样的异常
Caused by: java.lang.NullPointerException
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:32)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.api.scala.DataSet$$anon$$anonfun$flatMap.apply(DataSet.scala:417)
at org.apache.flink.api.scala.DataSet$$anon$$anonfun$flatMap.apply(DataSet.scala:417)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.flink.api.scala.DataSet$$anon.flatMap(DataSet.scala:417)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:163)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
有人知道怎么解决吗?
看起来 Flink 确实无法序列化具有空值集合字段的案例 class。在您的场景中,这将是带有 fraudLogs=null 的 AntiFraudSession。您认为是否有更多转换逻辑可能导致类似元素出现在 sessionData 中?
当你使用 Scala 在 Flink 中反序列化它时,请确保你在 case-class
中没有 null object/value。
为了避免 java.lang.NullPointerException
在 class fields/objects 的情况下使用 Option
根据你的例子:
如果任何字段为空
case class AntiFraudLog(
eventType: Option[Int],
valid: Boolean
)
如果 case class 对象为 null
case class AntiFraudSession(fraudLogs: Option[Seq[AntiFraudLog]])
注意:在 Scala 中使用 null 并不是一个好的 practice/standard。因此,请尝试使用 Scala 中提供的许多其他选项来避免这种情况。
有关详细信息,请单击 here。