在 bdutil 部署的集群上使用 SparkR 访问 Google 存储

Accessing Google Storage with SparkR on bdutil deployed cluster

我已经使用 bdutil 一年了,还有 hadoop 和 spark,这非常完美! 现在我在尝试让 SparkR 与 Google 存储作为 HDFS 一起工作时遇到了一个小问题。

这是我的设置: -bdutil 1.2.1 - 我已经部署了一个集群,其中包含 1 个主节点和 1 个工作节点,并安装了 Spark 1.3.0 - 在 master 和 worker 上安装了 R 和 SparkR

当我在主节点上 运行 SparkR 时,我试图通过几种方式在我的 GS 存储桶上指向一个目录:

1) 通过设置 gs 文件系统方案

> file <- textFile(sc, "gs://xxxxx/dir/")
> count(file)
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library is available
15/05/27 12:02:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library not loaded
collect on 5 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No FileSystem for scheme: gs
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1383)
        at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:66)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
        at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
        at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
        ... 25 more
Error: returnStatus == 0 is not TRUE

2) 使用 HDFS URL

> file <- textFile(sc, "hdfs://hadoop-stage-m:8020/dir/")
> count(file)
collect on 10 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
        at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://hadoop-stage-m:8020/dir
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
        at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
        at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
        ... 25 more
Error: returnStatus == 0 is not TRUE

3) 使用我在其他 Spark 作业中使用 Scala 的路径:与 2)

完全相同的错误

我确定我错过了一个明显的步骤。如果有人能在这件事上帮助我,那就太好了!

谢谢,

PS:我 100% 确定 gcs 连接器正在处理经典的 Scala 作业!

简答

您需要 core-site.xml、hdfs-site.xml 等,以及类路径中的 gcs-connector-1.3.3-hadoop1.jar。通过以下方式完成:

export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR

您可能还需要其他 spark-env.sh 设置;另外考虑 运行ning:

source /home/hadoop/spark-install/conf/spark-env.sh

./sparkR 之前。如果您在 R 中手动调用 sparkR.init,那么这不是必需的,因为您将直接传递 master 等参数。

其他可能的陷阱:

  1. 确保你的默认Java是Java7。如果是Java6,运行sudo update-alternatives --config java和selectJava 7 默认。
  2. 构建 sparkR 时确保设置 Spark 版本:SPARK_VERSION=1.3.0 ./install-dev.sh

长答案

通常,"No FileSystem for scheme" 错误意味着我们需要确保 core-site.xml 在类路径上;在修复类路径后我 运行 遇到的第二个错误是 "java.lang.ClassNotFoundException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" 这意味着我们还需要将 gcs-connector-1.3.3.jar 添加到类路径中。查看 SparkR 帮助程序脚本,主要 sparkR 二进制调用 sparkR.init 具有以下内容:

  sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))

MASTER 环境变量常见于 spark-env.sh 脚本中,实际上 bdutil 填充了 /home/hadoop/spark-install/conf/spark-env.sh 下的 MASTER 环境变量。通常,这应该表明只需添加 source /home/hadoop/spark-install/conf/spark-env.sh 就足以填充 SparkR 的必要设置,但如果我们查看 sparkR 定义,我们会看到:

#' Initialize a new Spark Context.
#'
#' This function initializes a new SparkContext.
#'
#' @param master The Spark master URL.
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkRBackendPort The port to use for SparkR JVM Backend.
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
#'                  list(spark.executor.memory="1g"))
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
#'                  list(spark.executor.memory="1g"),
#'                  list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
#'                  c("jarfile1.jar","jarfile2.jar"))
#'}

sparkR.init <- function(
  master = "",
  appName = "SparkR",
  sparkHome = Sys.getenv("SPARK_HOME"),
  sparkEnvir = list(),
  sparkExecutorEnv = list(),
  sparkJars = "",
  sparkRLibDir = "") {

  <...>
  cp <- paste0(jars, collapse = collapseChar)

  yarn_conf_dir <- Sys.getenv("YARN_CONF_DIR", "")
  if (yarn_conf_dir != "") {
    cp <- paste(cp, yarn_conf_dir, sep = ":")
  }
  <...>

    if (Sys.getenv("SPARKR_USE_SPARK_SUBMIT", "") == "") {
      launchBackend(classPath = cp,
                    mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
                    args = path,
                    javaOpts = paste("-Xmx", sparkMem, sep = ""))
    } else {
      # TODO: We should deprecate sparkJars and ask users to add it to the
      # command line (using --jars) which is picked up by SparkSubmit
      launchBackendSparkSubmit(
          mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
          args = path,
          appJar = .sparkREnv$assemblyJarPath,
          sparkHome = sparkHome,
          sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", ""))
    }

这告诉我们三件事:

  1. 默认的 sparkR 脚本无法通过 sparkJars,因此目前似乎没有将 libjars 作为标志传递的便捷方法。
  2. 有一个 TODO 无论如何都要弃用 sparkJars 参数。
  3. 除了 sparkJars 参数之外,进入 cp/classPath 参数的唯一其他内容是 YARN_CONF_DIR(除非我遗漏了一些其他来源添加类路径,或者如果我使用的 sparkR 版本与您不同)。另外,幸运的是,即使您不打算在 YARN 上使用 运行,它似乎也使用了 YARN_CONF_DIR

总之,这表明您可能至少需要 /home/hadoop/spark-install/conf/spark-env.sh 中的变量,因为至少有一些挂钩似乎在寻找通常在那里定义的环境变量,其次我们应该能够破解 YARN_CONF_DIR 指定类路径以使其找到 core-site.xml 以及将 gcs-connector-1.3.3.jar 添加到类路径中。

所以,你的问题的答案是:

export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR

如果您使用的是 hadoop2 或其他 gcs-connector 版本,则可能需要更改 /home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar 部分。该命令修复了 HDFS 访问以及为 gcs-connector 查找 fs.gs.impl 以及确保实际的 gcs-connector jar 在类路径上。它不会引入 spark-env.sh,因此您可能会发现它默认为 运行 MASTER=local。假设您的工作节点也已正确安装 SparkR,您可以考虑 运行ning 以下内容:

source /home/hadoop/spark-install/conf/spark-env.sh
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR

根据我遇到的情况,还有一些额外的注意事项:

  1. 您可能会发现您的 R 安装设置了较旧的 Java 版本。如果你 运行 变成 "unsupported major.minor version 51.0",运行 sudo update-alternatives --config java 并使 Java 7 成为默认值。
  2. 如果您使用的是 Spark 1.3.0,如果您使用的是 SparkR 的 install-dev.sh,Spark 可能会错误地挂起 "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory",而实际上调度程序因 serialVersionUID 不匹配而快速失败,这您可以在 /hadoop/spark/logs/*Master*.out 中看到 - 解决方案是确保您 运行 install-dev.sh 使用正确的 Spark 版本集:SPARK_VERSION=1.3.0 ./install-dev.sh