在 Spark 中广播 Joda DateTime 时出错

Error when broadcasting Joda DateTime in Spark

将 Joda Time 与 Spark 结合使用时,以下代码会导致 java.lang.NullPointerException

val todayBroadcast = sc.broadcast(new DateTime())
val dataRDD2 = dataRDD.filter(item => {
                                todayBroadcast.value.minusMonths(1).isBefore(item._1)
                              })

另一方面,下面的代码可以正常工作

    val dataRDD2 = dataRDD.filter(item => {
                                    val today = new DateTime()
                                    today.minusMonths(1).isBefore(item._1)
                                  })

据我所知,Joda 在 Apache Spark 提供的默认序列化方面存在一些问题。特别是问题出在 Kryo 序列化器上。

您可以查看 SO 线程。

无论如何,请尝试禁用 Kryo 序列化并使用标准 Java 序列化器 org.apache.spark.serializer.JavaSerializer。您可以在 Spark 安装的 spark-defaults.conf 中找到 属性 spark.serializer

现在,您应该有以下 属性:

spark.serializer=org.apache.spark.serializer.KryoSerializer

您必须更改为

spark.serializer=org.apache.spark.serializer.JavaSerializer

然后,重新启动 Spark 安装。如果您使用的是某些特定的发行版(即 Cloudera),请使用他们为您提供的管理控制台更改上述 属性。

如果您不能使用标准序列化程序,您可以将 DateTime 转换为其他一些序列化友好的格式,例如 StringLong(时间以毫秒为单位)

让我们知道。

如果你想继续使用 Kryo 序列化和 joda 日期时间,你可以改为这样做。这使用已经从 https://github.com/magro/kryo-serializers

创建的序列化程序

创建一个扩展 KryoRegistrator

的 class
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {

  import de.javakaffee.kryoserializers.jodatime.{JodaDateTimeSerializer, JodaLocalDateSerializer, JodaLocalDateTimeSerializer}
  import org.joda.time.{DateTime, LocalDate, LocalDateTime}

  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[DateTime], new JodaDateTimeSerializer())
    kryo.register(classOf[LocalDate], new JodaLocalDateSerializer())
    kryo.register(classOf[LocalDateTime], new JodaLocalDateTimeSerializer())
  }
}

然后用 sparkconf

注册 class
set("spark.kryo.registrator", "MyRegistrator")

这将正确序列化 joda 日期时间和本地日期

查看文档 https://spark.apache.org/docs/0.6.1/tuning.html -> 数据序列化