使用 spark 加载由 --files 参数分发的共享库 (.so)
Loading shared libraries (.so) distributed by --files argument with spark
我正在尝试在 运行 启动 spark 作业时使用外部本机库(.so 文件)。首先,我使用 --files
参数提交文件。
在创建 SparkContext
后加载我正在使用 System.load(SparkFiles.get(libname))
的库(以确保填充 SparkFiles
)。
问题是该库仅由驱动程序节点加载,当任务尝试访问本机方法时,我得到
WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, 13.0.0.206, executor 0): java.lang.UnsatisfiedLinkError
唯一对我有用的是在 运行 启动 spark 应用程序之前将 .so
文件复制到所有工作人员,并创建一个将在每个任务之前加载库的 Scala 对象(可以使用 mapPartitions
).
进行优化
我试过使用
--conf "spark.executor.extraLibraryPath=/local/path/to/so" \
--conf "spark.driver.extraLibraryPath=/local/path/to/so"
试图避免这种情况,但没有成功。
现在因为我使用 EMR 来 运行 激发作业,而不是一致的集群,
我想避免在 运行 完成作业之前将文件复制到所有节点。
有什么建议吗?
解决方案比我想象的要简单 - 我只需要为每个 JVM 加载一次库
所以基本上我需要的是使用 --files
添加库文件并创建一个 Loader 对象:
object LibraryLoader {
lazy val load = System.load(SparkFiles.get("libname"))
}
并在每个任务之前使用它(map
、filter
等)
例如
rdd.map { x =>
LibraryLoader.load
// do some stuff with x
}
惰性将确保在填充 SparkFiles 后创建对象,并且每个 JVM 都进行一次评估。
我正在尝试在 运行 启动 spark 作业时使用外部本机库(.so 文件)。首先,我使用 --files
参数提交文件。
在创建 SparkContext
后加载我正在使用 System.load(SparkFiles.get(libname))
的库(以确保填充 SparkFiles
)。
问题是该库仅由驱动程序节点加载,当任务尝试访问本机方法时,我得到
WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, 13.0.0.206, executor 0): java.lang.UnsatisfiedLinkError
唯一对我有用的是在 运行 启动 spark 应用程序之前将 .so
文件复制到所有工作人员,并创建一个将在每个任务之前加载库的 Scala 对象(可以使用 mapPartitions
).
我试过使用
--conf "spark.executor.extraLibraryPath=/local/path/to/so" \
--conf "spark.driver.extraLibraryPath=/local/path/to/so"
试图避免这种情况,但没有成功。
现在因为我使用 EMR 来 运行 激发作业,而不是一致的集群, 我想避免在 运行 完成作业之前将文件复制到所有节点。
有什么建议吗?
解决方案比我想象的要简单 - 我只需要为每个 JVM 加载一次库
所以基本上我需要的是使用 --files
添加库文件并创建一个 Loader 对象:
object LibraryLoader {
lazy val load = System.load(SparkFiles.get("libname"))
}
并在每个任务之前使用它(map
、filter
等)
例如
rdd.map { x =>
LibraryLoader.load
// do some stuff with x
}
惰性将确保在填充 SparkFiles 后创建对象,并且每个 JVM 都进行一次评估。