在 bdutil 部署的集群上使用 SparkR 访问 Google 存储
Accessing Google Storage with SparkR on bdutil deployed cluster
我已经使用 bdutil 一年了,还有 hadoop 和 spark,这非常完美!
现在我在尝试让 SparkR 与 Google 存储作为 HDFS 一起工作时遇到了一个小问题。
这是我的设置:
-bdutil 1.2.1
- 我已经部署了一个集群,其中包含 1 个主节点和 1 个工作节点,并安装了 Spark 1.3.0
- 在 master 和 worker 上安装了 R 和 SparkR
当我在主节点上 运行 SparkR 时,我试图通过几种方式在我的 GS 存储桶上指向一个目录:
1) 通过设置 gs 文件系统方案
> file <- textFile(sc, "gs://xxxxx/dir/")
> count(file)
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library is available
15/05/27 12:02:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library not loaded
collect on 5 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No FileSystem for scheme: gs
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1383)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
... 25 more
Error: returnStatus == 0 is not TRUE
2) 使用 HDFS URL
> file <- textFile(sc, "hdfs://hadoop-stage-m:8020/dir/")
> count(file)
collect on 10 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://hadoop-stage-m:8020/dir
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
... 25 more
Error: returnStatus == 0 is not TRUE
3) 使用我在其他 Spark 作业中使用 Scala 的路径:与 2)
完全相同的错误
我确定我错过了一个明显的步骤。如果有人能在这件事上帮助我,那就太好了!
谢谢,
PS:我 100% 确定 gcs 连接器正在处理经典的 Scala 作业!
简答
您需要 core-site.xml、hdfs-site.xml 等,以及类路径中的 gcs-connector-1.3.3-hadoop1.jar。通过以下方式完成:
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR
您可能还需要其他 spark-env.sh
设置;另外考虑 运行ning:
source /home/hadoop/spark-install/conf/spark-env.sh
在 ./sparkR
之前。如果您在 R 中手动调用 sparkR.init,那么这不是必需的,因为您将直接传递 master
等参数。
其他可能的陷阱:
- 确保你的默认Java是Java7。如果是Java6,运行
sudo update-alternatives --config java
和selectJava 7 默认。
- 构建 sparkR 时确保设置 Spark 版本:
SPARK_VERSION=1.3.0 ./install-dev.sh
长答案
通常,"No FileSystem for scheme" 错误意味着我们需要确保 core-site.xml 在类路径上;在修复类路径后我 运行 遇到的第二个错误是 "java.lang.ClassNotFoundException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" 这意味着我们还需要将 gcs-connector-1.3.3.jar 添加到类路径中。查看 SparkR 帮助程序脚本,主要 sparkR
二进制调用 sparkR.init
具有以下内容:
sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))
MASTER
环境变量常见于 spark-env.sh
脚本中,实际上 bdutil
填充了 /home/hadoop/spark-install/conf/spark-env.sh
下的 MASTER
环境变量。通常,这应该表明只需添加 source /home/hadoop/spark-install/conf/spark-env.sh
就足以填充 SparkR 的必要设置,但如果我们查看 sparkR
定义,我们会看到:
#' Initialize a new Spark Context.
#'
#' This function initializes a new SparkContext.
#'
#' @param master The Spark master URL.
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkRBackendPort The port to use for SparkR JVM Backend.
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"))
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"),
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
#' c("jarfile1.jar","jarfile2.jar"))
#'}
sparkR.init <- function(
master = "",
appName = "SparkR",
sparkHome = Sys.getenv("SPARK_HOME"),
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
sparkRLibDir = "") {
<...>
cp <- paste0(jars, collapse = collapseChar)
yarn_conf_dir <- Sys.getenv("YARN_CONF_DIR", "")
if (yarn_conf_dir != "") {
cp <- paste(cp, yarn_conf_dir, sep = ":")
}
<...>
if (Sys.getenv("SPARKR_USE_SPARK_SUBMIT", "") == "") {
launchBackend(classPath = cp,
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
args = path,
javaOpts = paste("-Xmx", sparkMem, sep = ""))
} else {
# TODO: We should deprecate sparkJars and ask users to add it to the
# command line (using --jars) which is picked up by SparkSubmit
launchBackendSparkSubmit(
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
args = path,
appJar = .sparkREnv$assemblyJarPath,
sparkHome = sparkHome,
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", ""))
}
这告诉我们三件事:
- 默认的
sparkR
脚本无法通过 sparkJars
,因此目前似乎没有将 libjars 作为标志传递的便捷方法。
- 有一个 TODO 无论如何都要弃用
sparkJars
参数。
- 除了
sparkJars
参数之外,进入 cp
/classPath
参数的唯一其他内容是 YARN_CONF_DIR
(除非我遗漏了一些其他来源添加类路径,或者如果我使用的 sparkR 版本与您不同)。另外,幸运的是,即使您不打算在 YARN 上使用 运行,它似乎也使用了 YARN_CONF_DIR
。
总之,这表明您可能至少需要 /home/hadoop/spark-install/conf/spark-env.sh
中的变量,因为至少有一些挂钩似乎在寻找通常在那里定义的环境变量,其次我们应该能够破解 YARN_CONF_DIR
指定类路径以使其找到 core-site.xml 以及将 gcs-connector-1.3.3.jar 添加到类路径中。
所以,你的问题的答案是:
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR
如果您使用的是 hadoop2 或其他 gcs-connector 版本,则可能需要更改 /home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
部分。该命令修复了 HDFS 访问以及为 gcs-connector 查找 fs.gs.impl
以及确保实际的 gcs-connector jar 在类路径上。它不会引入 spark-env.sh
,因此您可能会发现它默认为 运行 MASTER=local
。假设您的工作节点也已正确安装 SparkR,您可以考虑 运行ning 以下内容:
source /home/hadoop/spark-install/conf/spark-env.sh
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR
根据我遇到的情况,还有一些额外的注意事项:
- 您可能会发现您的 R 安装设置了较旧的 Java 版本。如果你 运行 变成 "unsupported major.minor version 51.0",运行
sudo update-alternatives --config java
并使 Java 7 成为默认值。
- 如果您使用的是 Spark 1.3.0,如果您使用的是 SparkR 的
install-dev.sh
,Spark 可能会错误地挂起 "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory",而实际上调度程序因 serialVersionUID 不匹配而快速失败,这您可以在 /hadoop/spark/logs/*Master*.out 中看到 - 解决方案是确保您 运行 install-dev.sh 使用正确的 Spark 版本集:SPARK_VERSION=1.3.0 ./install-dev.sh
我已经使用 bdutil 一年了,还有 hadoop 和 spark,这非常完美! 现在我在尝试让 SparkR 与 Google 存储作为 HDFS 一起工作时遇到了一个小问题。
这是我的设置: -bdutil 1.2.1 - 我已经部署了一个集群,其中包含 1 个主节点和 1 个工作节点,并安装了 Spark 1.3.0 - 在 master 和 worker 上安装了 R 和 SparkR
当我在主节点上 运行 SparkR 时,我试图通过几种方式在我的 GS 存储桶上指向一个目录:
1) 通过设置 gs 文件系统方案
> file <- textFile(sc, "gs://xxxxx/dir/")
> count(file)
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library is available
15/05/27 12:02:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/27 12:02:02 WARN LoadSnappy: Snappy native library not loaded
collect on 5 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No FileSystem for scheme: gs
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1383)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
... 25 more
Error: returnStatus == 0 is not TRUE
2) 使用 HDFS URL
> file <- textFile(sc, "hdfs://hadoop-stage-m:8020/dir/")
> count(file)
collect on 10 failed with java.lang.reflect.InvocationTargetException
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58)
at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://hadoop-stage-m:8020/dir
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at edu.berkeley.cs.amplab.sparkr.BaseRRDD.getPartitions(RRDD.scala:31)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:312)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
... 25 more
Error: returnStatus == 0 is not TRUE
3) 使用我在其他 Spark 作业中使用 Scala 的路径:与 2)
完全相同的错误我确定我错过了一个明显的步骤。如果有人能在这件事上帮助我,那就太好了!
谢谢,
PS:我 100% 确定 gcs 连接器正在处理经典的 Scala 作业!
简答
您需要 core-site.xml、hdfs-site.xml 等,以及类路径中的 gcs-connector-1.3.3-hadoop1.jar。通过以下方式完成:
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR
您可能还需要其他 spark-env.sh
设置;另外考虑 运行ning:
source /home/hadoop/spark-install/conf/spark-env.sh
在 ./sparkR
之前。如果您在 R 中手动调用 sparkR.init,那么这不是必需的,因为您将直接传递 master
等参数。
其他可能的陷阱:
- 确保你的默认Java是Java7。如果是Java6,运行
sudo update-alternatives --config java
和selectJava 7 默认。 - 构建 sparkR 时确保设置 Spark 版本:
SPARK_VERSION=1.3.0 ./install-dev.sh
长答案
通常,"No FileSystem for scheme" 错误意味着我们需要确保 core-site.xml 在类路径上;在修复类路径后我 运行 遇到的第二个错误是 "java.lang.ClassNotFoundException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" 这意味着我们还需要将 gcs-connector-1.3.3.jar 添加到类路径中。查看 SparkR 帮助程序脚本,主要 sparkR
二进制调用 sparkR.init
具有以下内容:
sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))
MASTER
环境变量常见于 spark-env.sh
脚本中,实际上 bdutil
填充了 /home/hadoop/spark-install/conf/spark-env.sh
下的 MASTER
环境变量。通常,这应该表明只需添加 source /home/hadoop/spark-install/conf/spark-env.sh
就足以填充 SparkR 的必要设置,但如果我们查看 sparkR
定义,我们会看到:
#' Initialize a new Spark Context.
#'
#' This function initializes a new SparkContext.
#'
#' @param master The Spark master URL.
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkRBackendPort The port to use for SparkR JVM Backend.
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"))
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"),
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
#' c("jarfile1.jar","jarfile2.jar"))
#'}
sparkR.init <- function(
master = "",
appName = "SparkR",
sparkHome = Sys.getenv("SPARK_HOME"),
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
sparkRLibDir = "") {
<...>
cp <- paste0(jars, collapse = collapseChar)
yarn_conf_dir <- Sys.getenv("YARN_CONF_DIR", "")
if (yarn_conf_dir != "") {
cp <- paste(cp, yarn_conf_dir, sep = ":")
}
<...>
if (Sys.getenv("SPARKR_USE_SPARK_SUBMIT", "") == "") {
launchBackend(classPath = cp,
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
args = path,
javaOpts = paste("-Xmx", sparkMem, sep = ""))
} else {
# TODO: We should deprecate sparkJars and ask users to add it to the
# command line (using --jars) which is picked up by SparkSubmit
launchBackendSparkSubmit(
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
args = path,
appJar = .sparkREnv$assemblyJarPath,
sparkHome = sparkHome,
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", ""))
}
这告诉我们三件事:
- 默认的
sparkR
脚本无法通过sparkJars
,因此目前似乎没有将 libjars 作为标志传递的便捷方法。 - 有一个 TODO 无论如何都要弃用
sparkJars
参数。 - 除了
sparkJars
参数之外,进入cp
/classPath
参数的唯一其他内容是YARN_CONF_DIR
(除非我遗漏了一些其他来源添加类路径,或者如果我使用的 sparkR 版本与您不同)。另外,幸运的是,即使您不打算在 YARN 上使用 运行,它似乎也使用了YARN_CONF_DIR
。
总之,这表明您可能至少需要 /home/hadoop/spark-install/conf/spark-env.sh
中的变量,因为至少有一些挂钩似乎在寻找通常在那里定义的环境变量,其次我们应该能够破解 YARN_CONF_DIR
指定类路径以使其找到 core-site.xml 以及将 gcs-connector-1.3.3.jar 添加到类路径中。
所以,你的问题的答案是:
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR
如果您使用的是 hadoop2 或其他 gcs-connector 版本,则可能需要更改 /home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
部分。该命令修复了 HDFS 访问以及为 gcs-connector 查找 fs.gs.impl
以及确保实际的 gcs-connector jar 在类路径上。它不会引入 spark-env.sh
,因此您可能会发现它默认为 运行 MASTER=local
。假设您的工作节点也已正确安装 SparkR,您可以考虑 运行ning 以下内容:
source /home/hadoop/spark-install/conf/spark-env.sh
export YARN_CONF_DIR=/home/hadoop/hadoop-install/conf:/home/hadoop/hadoop-install/lib/gcs-connector-1.3.3-hadoop1.jar
./sparkR
根据我遇到的情况,还有一些额外的注意事项:
- 您可能会发现您的 R 安装设置了较旧的 Java 版本。如果你 运行 变成 "unsupported major.minor version 51.0",运行
sudo update-alternatives --config java
并使 Java 7 成为默认值。 - 如果您使用的是 Spark 1.3.0,如果您使用的是 SparkR 的
install-dev.sh
,Spark 可能会错误地挂起 "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory",而实际上调度程序因 serialVersionUID 不匹配而快速失败,这您可以在 /hadoop/spark/logs/*Master*.out 中看到 - 解决方案是确保您 运行 install-dev.sh 使用正确的 Spark 版本集:SPARK_VERSION=1.3.0 ./install-dev.sh