在 Spark 中广播 Joda DateTime 时出错
Error when broadcasting Joda DateTime in Spark
将 Joda Time 与 Spark 结合使用时,以下代码会导致 java.lang.NullPointerException
val todayBroadcast = sc.broadcast(new DateTime())
val dataRDD2 = dataRDD.filter(item => {
todayBroadcast.value.minusMonths(1).isBefore(item._1)
})
另一方面,下面的代码可以正常工作
val dataRDD2 = dataRDD.filter(item => {
val today = new DateTime()
today.minusMonths(1).isBefore(item._1)
})
据我所知,Joda 在 Apache Spark 提供的默认序列化方面存在一些问题。特别是问题出在 Kryo 序列化器上。
您可以查看 SO 线程。
无论如何,请尝试禁用 Kryo 序列化并使用标准 Java 序列化器 org.apache.spark.serializer.JavaSerializer
。您可以在 Spark 安装的 spark-defaults.conf
中找到 属性 spark.serializer
。
现在,您应该有以下 属性:
spark.serializer=org.apache.spark.serializer.KryoSerializer
您必须更改为
spark.serializer=org.apache.spark.serializer.JavaSerializer
然后,重新启动 Spark 安装。如果您使用的是某些特定的发行版(即 Cloudera),请使用他们为您提供的管理控制台更改上述 属性。
如果您不能使用标准序列化程序,您可以将 DateTime
转换为其他一些序列化友好的格式,例如 String
或 Long
(时间以毫秒为单位)
让我们知道。
如果你想继续使用 Kryo 序列化和 joda 日期时间,你可以改为这样做。这使用已经从 https://github.com/magro/kryo-serializers
创建的序列化程序
创建一个扩展 KryoRegistrator
的 class
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {
import de.javakaffee.kryoserializers.jodatime.{JodaDateTimeSerializer, JodaLocalDateSerializer, JodaLocalDateTimeSerializer}
import org.joda.time.{DateTime, LocalDate, LocalDateTime}
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[DateTime], new JodaDateTimeSerializer())
kryo.register(classOf[LocalDate], new JodaLocalDateSerializer())
kryo.register(classOf[LocalDateTime], new JodaLocalDateTimeSerializer())
}
}
然后用 sparkconf
注册 class
set("spark.kryo.registrator", "MyRegistrator")
这将正确序列化 joda 日期时间和本地日期
查看文档 https://spark.apache.org/docs/0.6.1/tuning.html -> 数据序列化
将 Joda Time 与 Spark 结合使用时,以下代码会导致 java.lang.NullPointerException
val todayBroadcast = sc.broadcast(new DateTime())
val dataRDD2 = dataRDD.filter(item => {
todayBroadcast.value.minusMonths(1).isBefore(item._1)
})
另一方面,下面的代码可以正常工作
val dataRDD2 = dataRDD.filter(item => {
val today = new DateTime()
today.minusMonths(1).isBefore(item._1)
})
据我所知,Joda 在 Apache Spark 提供的默认序列化方面存在一些问题。特别是问题出在 Kryo 序列化器上。
您可以查看
无论如何,请尝试禁用 Kryo 序列化并使用标准 Java 序列化器 org.apache.spark.serializer.JavaSerializer
。您可以在 Spark 安装的 spark-defaults.conf
中找到 属性 spark.serializer
。
现在,您应该有以下 属性:
spark.serializer=org.apache.spark.serializer.KryoSerializer
您必须更改为
spark.serializer=org.apache.spark.serializer.JavaSerializer
然后,重新启动 Spark 安装。如果您使用的是某些特定的发行版(即 Cloudera),请使用他们为您提供的管理控制台更改上述 属性。
如果您不能使用标准序列化程序,您可以将 DateTime
转换为其他一些序列化友好的格式,例如 String
或 Long
(时间以毫秒为单位)
让我们知道。
如果你想继续使用 Kryo 序列化和 joda 日期时间,你可以改为这样做。这使用已经从 https://github.com/magro/kryo-serializers
创建的序列化程序创建一个扩展 KryoRegistrator
的 classimport com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {
import de.javakaffee.kryoserializers.jodatime.{JodaDateTimeSerializer, JodaLocalDateSerializer, JodaLocalDateTimeSerializer}
import org.joda.time.{DateTime, LocalDate, LocalDateTime}
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[DateTime], new JodaDateTimeSerializer())
kryo.register(classOf[LocalDate], new JodaLocalDateSerializer())
kryo.register(classOf[LocalDateTime], new JodaLocalDateTimeSerializer())
}
}
然后用 sparkconf
注册 classset("spark.kryo.registrator", "MyRegistrator")
这将正确序列化 joda 日期时间和本地日期
查看文档 https://spark.apache.org/docs/0.6.1/tuning.html -> 数据序列化