连接到 Spark 集群时的序列化问题

Serialization issues when connecting to Spark cluster

我有一个用 Scala 编写的 Spark 应用程序,它可以写入和读取 Parquet 文件。 该应用公开了一个 HTTP API,并且当它收到请求时,通过在应用的整个生命周期中持久存在的长期上下文将工作发送到 Spark 集群。 然后它 returns 将结果发送给 HTTP 客户端。

当我使用本地模式时一切正常,local[*] 作为主模式。 但是,当我尝试连接到 Spark 集群时,我 运行 遇到了序列化问题。 使用 Spark 的默认序列化程序,我得到以下信息:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.FilterExec.otherPreds of type scala.collection.Seq in instance of org.apache.spark.sql.execution.FilterExec.

如果我启用 Kryo 序列化程序,我会得到 java.lang.IllegalStateException: unread block data

在尝试读取 Parquet 文件时会发生这种情况,但我认为这与 Parquet 文件本身没有任何关系,只是与发送到 Spark 集群的代码的序列化有关。

通过大量互联网搜索,我了解到这可能是由于 Spark 版本之间甚至 Java 版本之间不兼容造成的。 但是使用的版本是相同的。

该应用程序是用 Scala 2.12.8 编写的,并随 Spark 2.4.3 一起提供。 Spark集群是运行Spark 2.4.3(用Scala 2.12编译的版本)。 并且 Spark 集群和应用程序所在的机器 运行 正在使用 openJDK 1.8.0_212.

根据另一个 Internet 搜索,问题可能是因为 spark.master URL 不匹配。 因此,我将 spark-defaults.conf 中的 spark.master 设置为我在应用程序中用于连接到它的相同值。

然而,这并没有解决问题,我现在 运行 没主意了。

我不完全确定潜在的解释是什么,但我通过将我的应用程序的 jar 复制到 Spark 的 jars 目录中来修复它。然后我仍然遇到一个错误,但是一个不同的错误:关于 Cats/kernel/Eq class 丢失的东西。所以我将 cats-kernel 的 jar 添加到 Spark 的 jars 目录中。

现在一切正常。我在另一个 Stack Overflow 线程中读到的内容可能会解释它:

I think that whenever you do any kind of map operation using a lambda which is referring to methods/classes of your project, you need to supply them as an additional jar. Spark does serializes the lambda itself, but is not pulling together its dependencies. Not sure why the error message is not informative at all.