本地主机上的 Spark BlockManager 运行
Spark BlockManager running on localhost
我有一个简单的脚本文件,我试图在模仿教程 here
的 spark-shell 中执行
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
sc.stop();
val conf = new SparkConf().setAppName("MyApp").setMaster("mesos://zk://172.24.51.171:2181/mesos").set("spark.executor.uri", "hdfs://172.24.51.171:8020/spark-1.3.0-bin-hadoop2.4.tgz").set("spark.driver.host", "172.24.51.142")
val sc2 = new SparkContext(conf)
val file = sc2.textFile("hdfs://172.24.51.171:8020/input/pg4300.txt")
val errors = file.filter(line => line.contains("ERROR"))
errors.count()
我的namenode和mesos master在172.24.51.171,我的ip地址是172.24.51.142。我将这些行保存到一个文件中,然后使用以下命令启动该文件:
/opt/spark-1.3.0-bin-hadoop2.4/bin/spark-shell -i WordCount.scala
我的远程执行器都死了,出现类似以下的错误:
15/04/08 14:30:39 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to localhost/127.0.0.1:48554
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon.createAndStart(NettyBlockTransferService.scala:78)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote.apply(BlockManager.scala:594)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote.apply(BlockManager.scala:592)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:592)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:586)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:136)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:136)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply$mcVI$sp(TorrentBroadcast.scala:136)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:119)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:174)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: localhost/127.0.0.1:48554
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
... 1 more
此故障发生在我 运行 errors.count() 命令之后。早些时候在我的 shell 中,在我创建新的 SparkContext 之后,我看到了以下行:
15/04/08 14:31:18 INFO NettyBlockTransferService: Server created on 48554
15/04/08 14:31:18 INFO BlockManagerMaster: Trying to register BlockManager
15/04/08 14:31:18 INFO BlockManagerMasterActor: Registering block manager localhost:48554 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 48554)
15/04/08 14:31:18 INFO BlockManagerMaster: Registered BlockManager
我想发生的事情是 Spark 将 BlockManager 的地址记录为 localhost:48554,然后将其发送给所有尝试与其 localhosts:48554 交谈的执行者,而不是驱动程序的ip 地址在端口 48554。为什么 spark 使用 localhost 作为 BlockManager 的地址而不是 spark.driver.host?
附加信息
Spark Config里面有spark.blockManager.port没有spark.blockManager.host?只有一个spark.driver.host,你可以看到我在我的SparkConf中设置了
可能与此有关 JIRA Ticket 尽管这看起来像是网络问题。我的网络配置了 DNS 就好了。
尝试通过 sparkConf
对象设置 SPARK_LOCAL_IP
(在命令行上)或 spark.local.ip
。
您可以尝试在调用 spark-shell 时使用 --master 参数提供 Spark Master 地址(或添加 spark-defaults.conf)。我遇到了类似的问题(请参阅我的 post Spark Shell Listens on localhost instead of configured IP address),当在 shell.
中动态创建上下文时,BlockManager 似乎在本地主机上侦听
日志:
使用原始上下文时(监听主机名)
BlockManagerInfo:在 ubuntu64server2:33301
上的内存中添加了 broadcast_1_piece0
创建新上下文时(在本地主机上侦听)
BlockManagerInfo:在 localhost:40235
上的内存中添加了 broadcast_1_piece0
我必须连接到 Cassandra 集群,并且能够通过在 spark-defaults.conf 中提供 spark.cassandra.connection.host 并在 spark shell 中导入包 com.datastax.spark.connector._ 来查询它.
我有一个简单的脚本文件,我试图在模仿教程 here
的 spark-shell 中执行import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
sc.stop();
val conf = new SparkConf().setAppName("MyApp").setMaster("mesos://zk://172.24.51.171:2181/mesos").set("spark.executor.uri", "hdfs://172.24.51.171:8020/spark-1.3.0-bin-hadoop2.4.tgz").set("spark.driver.host", "172.24.51.142")
val sc2 = new SparkContext(conf)
val file = sc2.textFile("hdfs://172.24.51.171:8020/input/pg4300.txt")
val errors = file.filter(line => line.contains("ERROR"))
errors.count()
我的namenode和mesos master在172.24.51.171,我的ip地址是172.24.51.142。我将这些行保存到一个文件中,然后使用以下命令启动该文件:
/opt/spark-1.3.0-bin-hadoop2.4/bin/spark-shell -i WordCount.scala
我的远程执行器都死了,出现类似以下的错误:
15/04/08 14:30:39 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to localhost/127.0.0.1:48554
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon.createAndStart(NettyBlockTransferService.scala:78)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote.apply(BlockManager.scala:594)
at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote.apply(BlockManager.scala:592)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:592)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:586)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:136)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:136)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply$mcVI$sp(TorrentBroadcast.scala:136)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:119)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:174)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: localhost/127.0.0.1:48554
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
... 1 more
此故障发生在我 运行 errors.count() 命令之后。早些时候在我的 shell 中,在我创建新的 SparkContext 之后,我看到了以下行:
15/04/08 14:31:18 INFO NettyBlockTransferService: Server created on 48554
15/04/08 14:31:18 INFO BlockManagerMaster: Trying to register BlockManager
15/04/08 14:31:18 INFO BlockManagerMasterActor: Registering block manager localhost:48554 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 48554)
15/04/08 14:31:18 INFO BlockManagerMaster: Registered BlockManager
我想发生的事情是 Spark 将 BlockManager 的地址记录为 localhost:48554,然后将其发送给所有尝试与其 localhosts:48554 交谈的执行者,而不是驱动程序的ip 地址在端口 48554。为什么 spark 使用 localhost 作为 BlockManager 的地址而不是 spark.driver.host?
附加信息
Spark Config里面有spark.blockManager.port没有spark.blockManager.host?只有一个spark.driver.host,你可以看到我在我的SparkConf中设置了
可能与此有关 JIRA Ticket 尽管这看起来像是网络问题。我的网络配置了 DNS 就好了。
尝试通过 sparkConf
对象设置 SPARK_LOCAL_IP
(在命令行上)或 spark.local.ip
。
您可以尝试在调用 spark-shell 时使用 --master 参数提供 Spark Master 地址(或添加 spark-defaults.conf)。我遇到了类似的问题(请参阅我的 post Spark Shell Listens on localhost instead of configured IP address),当在 shell.
中动态创建上下文时,BlockManager 似乎在本地主机上侦听日志:
使用原始上下文时(监听主机名) BlockManagerInfo:在 ubuntu64server2:33301
上的内存中添加了 broadcast_1_piece0
创建新上下文时(在本地主机上侦听) BlockManagerInfo:在 localhost:40235
上的内存中添加了 broadcast_1_piece0
我必须连接到 Cassandra 集群,并且能够通过在 spark-defaults.conf 中提供 spark.cassandra.connection.host 并在 spark shell 中导入包 com.datastax.spark.connector._ 来查询它.