Apache Spark 中的转换过程

Transformation process in Apache Spark

转换基于现有的 RDD 创建新的 RDD。基本上,RDD 是不可变的,Spark 中的所有转换都是惰性的。 RDD 中的数据在执行操作之前不会被处理,但在不处理数据的情况下,新的 RDD 是如何创建的?例如,在 filter 操作中,如何在不实际将 RDD 加载到内存中并进行处理的情况下创建新的 RDD?

Spark 转换操作惰性。这些操作不会立即计算出来,它只记住应用于 RDD 的转换和 returns 指向操作输出的指针。只有在对其应用操作时才会计算转换操作。应用操作后,spark 会将操作分解为任务并将它们分发到节点上执行。

Question : For example, in filter operation how are new RDD created without actually loading the RDDs into memory and processing it?

Apache Spark 中的转换过程:

例如:

firstRDD=spark.textFile("hdfs://...")

secondRDD=firstRDD.filter(someFunction);

thirdRDD = secondRDD.map(someFunction);

result = thirdRDD.count()

由于 RDD 是通过一组转换创建的,它记录这些转换,而不是实际数据(这就像 需要什么的行动计划如果我们用这个特定的 predivate 进行过滤就完成了。生成一个 RDD 的这些转换的图称为如下所示的沿袭图。

上述示例中的 Spark RDD 沿袭图为:

请看RDD.scala 仅当遇到使用您的 filter 的谓词时,它才会创建新的 RDD。 这就像行动计划。只有当你调用像 count.

这样的动作时,这个计划才会被执行
/*** Return a new RDD containing only the elements that satisfy a predicate.
       */
      def filter(f: T => Boolean): RDD[T] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[T, T](
          this,
          (context, pid, iter) => iter.filter(cleanF),
          preservesPartitioning = true)
      }
  • 惰性求值意味着当我们在 RDD 上调用转换(例如,调用 map() )时,操作不会立即执行。
  • 相反,Spark 会在内部记录元数据以指示已请求此操作。 与其将 RDD 视为包含特定数据,不如将每个 RDD 视为包含有关如何计算我们通过转换构建的数据的指令。
  • 将数据加载到 RDD 中会以与转换相同的方式延迟求值。因此,当我们调用 sc.textFile() 时,只有在必要时才会加载数据。与转换一样,操作(在本例中为读取数据)可以发生多次。

惰性评估:(更正您的引用“Spark 中的所有转换都是惰性的”到“Spark 中的所有转换都是延迟计算的")

Spark computes RDDs lazily the first time they are used in an action, so that it can pipeline transformations. So , in above example RDD will be evaluated only when count() action is invoked.

希望对您有所帮助...