Spark RDD 的 take(1) 和 first() 的区别

Difference between Spark RDD's take(1) and first()

我以前一直以为rdd.take(1)rdd.first()是一模一样的。然而,在我的同事向我指出 Spark's officiation documentation on RDD:

之后,我开始怀疑这是否真的如此

first(): Return the first element in this RDD.

take(num): Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

我的问题是:

  1. first()的底层实现是否与take(1)相同?
  2. 假设 rdd1rdd2 是从同一个 csv 构造的,我可以安全地假设 rdd1.take(1)rdd2.first() 总是 return 相同的结果,即 csv 的第一行?如果 rdd1rdd2 分区不同怎么办?

不,两者不一样。

rdd.first() 将 Return 此 RDD 中的第一个元素 而 rdd.take(1) 将 return 一个只有第一个元素的数组。

  1. first()的底层实现和take(1)一样吗?

答案:在实现方面,first() 在内部调用 take(1),return 是数组 return 由 take(1) 编辑的第一个也是唯一的元素。摘自 org.apache.spark.rdd.RDD class

  /**
   * Return the first element in this RDD.
   */
  def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
  }
  1. 假设 rdd1 和 rdd2 是从同一个 csv 构造的,我可以安全地假设 rdd1.take(1)rdd2.first() 总是 return 相同的结果,即第一行格式文件?如果rdd1和rdd2分区不同怎么办?

回答:是的,您可以假设,分区不会改变读取输入的顺序。

事实上first是根据take实现的。

以下内容摘自 RDD.scala 的 spark 来源。 first 调用 take(1) 并且 return 找到第一个元素。

  def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
  }

take(num) 尝试从 RDD 的第 0 个分区开始获取 num 个元素(如果考虑基于 0 的索引)。所以 take(1) 和 first 的行为是相同的。

甚至 spark programming guide 也证实了这一点。

关于你的第二个问题:这取决于你所说的不同分区是什么意思。如果您在调用 sc.textFile("/path/to/file") 时使用或不使用 numPartitions,都没有关系,因为第 0 个分区将始终是第 0 个分区。所以是的,您可以假设它们将具有相同的第一个元素。

编辑:RDD 中的分区是有序的,CSV 中的物理第一行将在 RDD 的第 0 个分区中结束。 take(1)first 都将 return 第 0 个分区的第一行。

所以,看起来都是一样的,但我们确实有不同。

1.When我们从文件中读取数据,默认是一个RDD,一个RDD同时具有first()take()属性。
2.The first() 属性 returns 行类型对象,而 take() returns 列表类型。

但是 一旦我们使用 .toDF() 将我们的 RDD 转换为 DataFrame,我们就没有那个 DF 上的 first() 属性。

希望它能进一步理清概念。

See the image for more clearity