为什么 foreach 没有给驱动程序带来任何东西?

Why does foreach not bring anything to the driver program?

我用spark写了这个程序shell

val array = sc.parallelize(List(1, 2, 3, 4))
array.foreach(x => println(x))

这会打印一些调试语句,但不会打印实际数字。

下面的代码工作正常

for(num <- array.take(4)) {
  println(num)
}

我明白 take 是一个动作,因此会导致 spark 触发惰性计算。

但是 foreach 应该以相同的方式工作...为什么 foreach 没有从 spark 带回任何东西并开始进行实际处理(退出惰性模式)

如何使 rdd 上的 foreach 工作?

Spark 中的 RDD.foreach 方法在集群上运行,因此包含这些记录的每个 worker 都是 运行 foreach 中的操作。 IE。您的代码是 运行,但它们是在 Spark worker stdout 上打印出来的,而不是在 driver/your shell 会话中。如果您查看 Spark worker 的输出 (stdout),您会看到这些打印到控制台。

您可以通过转到每个 运行 执行程序的 web gui 运行 查看工作人员的标准输出。例如 URL 是 http://workerIp:workerPort/logPage/?appId=app-20150303023103-0043&executorId=1&logType=stdout

本例中Spark选择将RDD的所有记录放在同一个分区中。

如果您考虑一下,这是有道理的 - 查看 foreach 的函数签名 - 它没有 return 任何东西。

/**
 * Applies a function f to all elements of this RDD.
 */
def foreach(f: T => Unit): Unit

这确实是 foreach 在 scala 中的目的 - 它用于副作用。

当您收集记录时,您将它们带回驱动程序,因此逻辑上 collect/take 操作只是 运行 在 Spark 驱动程序中的 Scala 集合上 - 您可以看到日志输出作为 spark driver/spark shell 是什么在您的会话中打印到标准输出。

foreach 的一个用例可能不会立即显现出来,举个例子——如果你想为 RDD 中的每条记录做一些外部行为,比如调用 REST api,你可以在foreach,然后每个 Spark worker 将使用该值提交对 API 服务器的调用。如果 foreach 确实带回了记录,您可以轻松地耗尽 driver/shell 进程中的内存。通过这种方式,您可以避免这些问题,并且可以对集群上 RDD 中的所有项目产生副作用。

如果您想查看我使用的 RDD 中的内容;

array.collect.foreach(println) 
//Instead of collect, use take(...) or takeSample(...) if the RDD is large

你可以使用RDD.toLocalIterator()将数据带到驱动程序(一次一个RDD分区):

val array = sc.parallelize(List(1, 2, 3, 4))
for(rec <- array.toLocalIterator) { println(rec) }

另见

  • Spark: Best practice for retrieving big data from RDD to local machine
  • this blog post 关于 toLocalIterator