Spark - 连接异常失败:java.net.ConnectException - localhost

Spark - failed on connection exception: java.net.ConnectException - localhost

我在一台机器上 运行 hadoop 和 spark (Ubuntu 14.04)。 JPS 命令给我以下输出

hduser@ubuntu:~$ jps
4370 HRegionServer
6568 Jps
5555 RunJar
3744 TaskTracker
5341 RunJar
4120 HQuorumPeer
5790 SparkSubmit
3308 DataNode
4203 HMaster
3469 SecondaryNameNode
3079 NameNode
3587 JobTracker

我在 HDFS 中创建了一个简单的 csv 文件。文件的以下详细信息。

hduser@ubuntu:~$ hadoop fs -ls /user/hduser/file_for_spark/spark1.csv
Warning: $HADOOP_HOME is deprecated.

Found 1 items
-rw-r--r--   1 hduser supergroup        174 2015-04-16 08:14 /user/hduser/file_for_spark/spark1.csv

但是在尝试从 spark 访问文件时出现连接失败异常:java.net.ConnectException:连接被拒绝错误

          ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_21)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val pagecount=sc.textFile("hdfs://localhost:3079/user/hduser/file_for_spark/spark1.csv")
pagecount: org.apache.spark.rdd.RDD[String] = hdfs://localhost:3079/user/hduser/file_for_spark/spark1.csv MapPartitionsRDD[1] at textFile at <console>:21

scala> pagecount.count()
java.net.ConnectException: Call to localhost/127.0.0.1:3079 failed on connection exception: java.net.ConnectException: Connection refused
    at org.apache.hadoop.ipc.Client.wrapException(Client.java:1099)
    at org.apache.hadoop.ipc.Client.call(Client.java:1075)
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
    at com.sun.proxy.$Proxy11.getProtocolVersion(Unknown Source)
    at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
    at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
    at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:238)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:203)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
    at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:66)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1006)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC.<init>(<console>:39)
    at $iwC.<init>(<console>:41)
    at <init>(<console>:43)
    at .<init>(<console>:47)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:692)
    at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:489)
    at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:434)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access00(Client.java:184)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1206)
    at org.apache.hadoop.ipc.Client.call(Client.java:1050)
    ... 73 more

编辑 1:

@保罗 该命令给出以下输出

hduser@ubuntu:~$ lsof -p 3079 -a -i
COMMAND  PID   USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    3079 hduser   65u  IPv4  17260      0t0  TCP *:33736 (LISTEN)
java    3079 hduser   75u  IPv4  17341      0t0  TCP localhost:54310 (LISTEN)
java    3079 hduser   85u  IPv4  22609      0t0  TCP *:50070 (LISTEN)
java    3079 hduser   89u  IPv4  59998      0t0  TCP localhost:54310->localhost:46507 (ESTABLISHED)

你能告诉我端口 46507 到底发生了什么吗

抱歉...实际上 namenode 运行 在我从 core-site.xml 中找到的不同端口中。使用端口 54310 后对我有用。

<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:54310</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>

谁能告诉我这里的3079是什么意思,这是JPS命令的输出。

这是对你自己的一个很好的回答! :)

3079 是namenode的pid(进程id)。在此处查看有关 jps 输出的更多详细信息:Jps doc。顺便说一句,您可以使用 pid 和命令验证 namenode 的监听端口,例如:

lsof -p 3079 -a -i

在您的命令输出中:

java    3079 hduser   89u  IPv4  59998      0t0  TCP localhost:54310->localhost:46507 (ESTABLISHED)

该行表示其他进程在端口 46507 与 Namenode 建立了 TCP 连接。可能的进程可能是资源管理器 (RM) 或数据节点等。