使用 sparkr 时,我应该在工作节点上预安装 cran r 包吗

should I pre-install cran r packages on worker nodes when using sparkr

我想在 cran 上使用 r 包,例如 forecast 等与 sparkr 一起使用,遇到以下两个问题。

  1. 我应该在工作节点上预安装所有这些包吗?但是当我阅读spark this file的源代码时,似乎spark会自动压缩包并通过--jars或--packages分发给worker。我应该怎么做才能使工作人员可以使用依赖项?

  2. 假设我需要在map转换中使用forecast提供的函数,应该如何导入包。我是否需要执行以下操作,在 map 函数中导入包,是否会进行多次导入: SparkR:::map(rdd, function(x){ library(forecast) then do other staffs })

更新:

阅读了更多的源代码后,似乎可以根据this file使用includePackage在工作节点上包含包。所以现在问题变成了我必须手动在节点上预安装软件包是否正确?如果这是真的,问题 1 中描述的 --jars 和 --packages 的用例是什么?如果那是错误的,如何使用 --jars 和 --packages 来安装软件包?

重复这个很无聊,但是你首先不应该使用内部 RDD API。它已在第一个官方 SparkR 版本中被删除,它根本不适合一般用途。

直到新的低级别 API* 准备就绪(参见示例 SPARK-12922 SPARK-12919, SPARK-12792)我不会将 Spark 视为 运行 纯 R 代码的平台。即使它发生变化,使用 R 包装器添加本机 (Java / Scala) 代码也是更好的选择。

话虽如此,让我们从您的问题开始:

  1. RPackageUtils 旨在处理使用 Spark 包创建的包。它不处理标准 R 库。

  2. 是的,您需要在每个节点上安装包。来自 includePackage 文档字符串:

    The package is assumed to be installed on every node in the Spark cluster.


* 如果您使用 Spark 2.0+,则可以使用 dapply、gapply 和 lapply 函数。

添加库适用于 spark 2.0+。例如,我在集群的所有节点中添加包预测。该代码适用于 Spark 2.0+ 和数据块环境。

schema <- structType(structField("out", "string"))
out <- gapply(
  df,
  c("p", "q"),
  function(key, x) 
  if (!all(c("forecast") %in% (.packages()))){
     if (!require("forecast")) {
        install.packages("forecast", repos ="http://cran.us.r-project.org", INSTALL_opts = c('--no-lock'))
     }
  }  
  #use forecast
  #dataframe out
  data.frame(out = x$column, stringAsFactor = FALSE)
}, 
schema)

更好的选择是通过spark-submit archive选项传递你本地的R包,这意味着你不需要在每个worker中安装R包,也不需要在运行期间安装和编译R包SparkR::dapply 用于耗时的等待。例如:

Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client --num-executors 40 --executor-cores 10 --executor-memory 8G --driver-memory 512M --jars /usr/lib/hadoop/lib/hadoop-lzo-0.4.15-cdh5.11.1.jar --files /etc/hive/conf/hive-site.xml --archives /your_R_packages/3.5.zip --files xgboost.model sparkr-shell")

调用SparkR::dapply函数时,让它先调用.libPaths("./3.5.zip/3.5")。并且您需要注意服务器版本 R 版本必须等于您的 zip 文件 R 版本。