Spark中DataFrame、Dataset和RDD的区别

Difference between DataFrame, Dataset, and RDD in Spark

我只是想知道 RDDDataFrame 之间有什么区别(Spark 2.0.0 DataFrame 只是 Dataset[Row] 的类型别名) 在 Apache Spark 中?

你能把一个转换成另一个吗?

DataFrame 定义良好,google 搜索 "DataFrame definition":

A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case.

因此,DataFrame 由于其表格格式而具有额外的元数据,这允许 Spark 运行 对最终查询进行某些优化。

另一方面,

一个 RDD 仅仅是一个 R弹性 D 分布式 D 数据集更像是一个无法优化的数据黑盒,因为可以对其执行的操作不受限制。

但是,您可以通过其 rdd 方法从 DataFrame 转到 RDD,也可以从 RDD 转到 DataFrame(如果RDD是表格格式)通过toDF方法

一般来说 由于内置查询优化,建议尽可能使用 DataFrame

DataFrame 相当于 RDBMS 中的 table,也可以用类似于 RDD 中的 "native" 分布式集合的方式进行操作。与 RDD 不同,Dataframes 跟踪模式并支持各种关系操作,从而实现更优化的执行。 每个 DataFrame 对象代表一个逻辑计划,但由于它们的 "lazy" 性质,在用户调用特定的 "output operation".

之前不会执行

只是 RDD 是核心组件,但 DataFrame 是 spark 1.30 中引入的 API。

RDD

名为 RDD 的数据分区集合。这些 RDD 必须遵循一些属性,例如:

  • 不可变,
  • 容错,
  • 分布式,
  • 更多。

此处 RDD 是结构化或非结构化的。

数据帧

DataFrame 是 Scala、Java、Python 和 R 中可用的 API。它允许处理任何类型的结构化和半结构化数据。要定义 DataFrame,分布式数据的集合组织成名为 DataFrame 的命名列。您可以轻松优化 DataFrame 中的 RDDs。 您可以使用 DataFrame.

一次处理 JSON 数据、parquet 数据、HiveQL 数据
val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")

val sample_DF = sampleRDD.toDF()

此处Sample_DF视为DataFramesampleRDD 是(原始数据)称为 RDD

因为 DataFrame 是弱类型的,开发人员无法从类型系统中获益。例如,假设您想从 SQL 和 运行 中读取一些内容:

val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")

people.filter("age > 30")
  .join(department, people("deptId") === department("id"))
  .groupBy(department("name"), "gender")
  .agg(avg(people("salary")), max(people("age")))

当您说 people("deptId") 时,您返回的不是 IntLong,您返回的是您需要的 Column 对象进行操作。在具有丰富类型系统的语言(如 Scala)中,您最终会失去所有类型安全性,这会增加 运行-time 错误的数量,这些错误可能会在编译时被发现。

反之,输入DataSet[T]。当你这样做时:

val people: People = val people = sqlContext.read.parquet("...").as[People]

你实际上得到了一个 People 对象,其中 deptId 是一个实际的整数类型而不是列类型,因此利用了类型系统。

从 Spark 2.0 开始,DataFrame 和 DataSet API 将统一,其中 DataFrame 将是 DataSet[Row].

的类型别名

First thing is DataFrame was evolved from SchemaRDD.

是的.. DataframeRDD 之间的转换是绝对可能的。

下面是一些示例代码片段。

  • df.rddRDD[Row]

以下是创建数据框的一些选项。

  • 1) yourrddOffrow.toDF 转换为 DataFrame.

  • 2) 使用 createDataFrame 的 sql 上下文

    val df = spark.createDataFrame(rddOfRow, schema)

where schema can be from some of below options
From scala case class and scala reflection api

import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]

OR using Encoders

import org.apache.spark.sql.Encoders
val mySchema = Encoders.product[MyCaseClass].schema

as described by Schema can also be created using StructType and StructField

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("col1", DoubleType, true))
  .add(StructField("col2", DoubleType, true)) etc...

In fact there Are Now 3 Apache Spark APIs..

  1. RDD API :

The RDD (Resilient Distributed Dataset) API has been in Spark since the 1.0 release.

The RDD API provides many transformation methods, such as map(), filter(), and reduce() for performing computations on the data. Each of these methods results in a new RDD representing the transformed data. However, these methods are just defining the operations to be performed and the transformations are not performed until an action method is called. Examples of action methods are collect() and saveAsObjectFile().

RDD 示例:

rdd.filter(_.age > 21) // transformation
   .map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action

示例:使用 RDD

按属性过滤
rdd.filter(_.age > 21)
  1. DataFrame API

Spark 1.3 introduced a new DataFrame API as part of the Project Tungsten initiative which seeks to improve the performance and scalability of Spark. The DataFrame API introduces the concept of a schema to describe the data, allowing Spark to manage the schema and only pass data between nodes, in a much more efficient way than using Java serialization.

The DataFrame API is radically different from the RDD API because it is an API for building a relational query plan that Spark’s Catalyst optimizer can then execute. The API is natural for developers who are familiar with building query plans

示例 SQL 样式:

df.filter("age > 21");

限制: 因为代码通过名称引用数据属性,所以编译器不可能捕捉到任何错误。如果属性名称不正确,则只会在运行时,即创建查询计划时检测到错误。

DataFrame API 的另一个缺点是它非常 scala-centric,虽然它确实支持 Java,但支持是有限的。

例如,当从现有 RDD 个 Java 个对象创建 DataFrame 时,Spark 的 Catalyst 优化器无法推断模式并假定 DataFrame 中的任何对象都实现了 scala.Product界面。 Scala case class 因为他们实现了这个接口,所以可以正常工作。

  1. Dataset API

The Dataset API, released as an API preview in Spark 1.6, aims to provide the best of both worlds; the familiar object-oriented programming style and compile-time type-safety of the RDD API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API.

When it comes to serializing data, the Dataset API has the concept of encoders which translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object. Spark does not yet provide an API for implementing custom encoders, but that is planned for a future release.

Additionally, the Dataset API is designed to work equally well with both Java and Scala. When working with Java objects, it is important that they are fully bean-compliant.

示例DatasetAPISQL样式:

dataset.filter(_.age < 21);

评价差异。在 DataFrameDataSet 之间:

Catalist level flow..(从 spark 峰会揭秘 DataFrame 和 Dataset 演示文稿)

进一步阅读...数据块article - A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets

Apache Spark 提供三种类型的 APIs

  1. RDD
  2. DataFrame
  3. 数据集

这是 APIRDD、Dataframe 和 Dataset 之间的比较。

RDD

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.

RDD 特点:-

  • 分布式集合:
    RDD 使用 MapReduce 操作,该操作被广泛用于在集群上使用并行分布式算法处理和生成大型数据集。它允许用户使用一组高级运算符编写并行计算,而不必担心工作分配和容错。

  • Immutable: RDDs由一组被分区的记录组成。分区是RDD中并行的基本单元,每个分区是数据的一个逻辑分区,是immutable,通过对现有partitions.Immutability的一些转换创建的,有助于实现计算的一致性。

  • 容错: 如果我们丢失了 RDD 的一些分区,我们可以在沿袭中重放该分区的转换以实现相同的计算,而不是跨多个 nodes.This 进行数据复制特性是 RDD 的最大好处,因为它节省了在数据管理和复制方面付出了很多努力,从而实现了更快的计算。

  • 惰性计算: Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。相反,他们只记得应用于某些基础数据集的转换。仅当操作需要将结果 returned 到驱动程序时才计算转换。

  • 函数转换: RDD 支持两种类型的操作:转换,从现有数据集创建新数据集,以及操作,在对数据集进行 运行 计算后,return 为驱动程序提供一个值。

  • 数据处理格式:
    它可以轻松高效地处理结构化数据和非结构化数据。

  • 支持的编程语言:
    RDD API 在 Java、Scala、Python 和 R.

    中可用

RDD 限制:-

  • 没有内置优化引擎: 在处理结构化数据时,RDD 无法利用 Spark 的高级优化器,包括催化剂优化器和 Tungsten 执行引擎。开发者需要根据每个RDD的属性对其进行优化。

  • 处理结构化数据: 与 Dataframe 和数据集不同,RDD 不会推断所摄取数据的架构,而是需要用户指定它。

数据帧

Spark 在 Spark 1.3 版本中引入了 Dataframes。 Dataframe 克服了 RDD 面临的主要挑战。

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a R/Python Dataframe. Along with Dataframe, Spark also introduced catalyst optimizer, which leverages advanced programming features to build an extensible query optimizer.

数据框功能:-

  • 行对象的分布式集合: DataFrame 是组织成命名列的分布式数据集合。它在概念上等同于关系数据库中的 table,但在底层进行了更丰富的优化。

  • 数据处理: 处理结构化和非结构化数据格式(Avro、CSV、弹性搜索和 Cassandra)和存储系统(HDFS、HIVE tables、MySQL 等)。它可以从所有这些不同的数据源读取和写入。

  • 使用催化剂优化器进行优化: 它支持 SQL 查询和 DataFrame API。 Dataframe分四个阶段使用催化剂树转换框架,

     1.Analyzing a logical plan to resolve references
     2.Logical plan optimization
     3.Physical planning
     4.Code generation to compile parts of the query to Java bytecode.
    
  • Hive 兼容性: 使用 Spark SQL,您可以 运行 对现有 Hive 仓库进行未修改的 Hive 查询。它重用 Hive 前端和 MetaStore,并为您提供与现有 Hive 数据、查询和 UDF 的完全兼容性。

  • 钨: Tungsten 提供了一个物理执行后端,它显式管理内存并动态生成用于表达式评估的字节码。

  • 支持的编程语言:
    Dataframe API 在 Java、Scala、Python 和 R 中可用。

数据帧限制:-

  • 编译时类型安全: 如前所述,Dataframe API 不支持编译时安全,这会限制您在结构未知时操作数据。以下示例在编译时有效。但是,执行此代码时会出现运行时异常。

示例:

case class Person(name : String , age : Int) 
val dataframe = sqlContext.read.json("people.json") 
dataframe.filter("salary > 10000").show 
=> throws Exception : cannot resolve 'salary' given input age , name

当您处理多个转换和聚合步骤时,这尤其具有挑战性。

  • 无法操作域对象(丢失域对象): 一旦将域对象转换为数据框,就无法从中重新生成它。在下面的例子中,一旦我们从 personRDD 创建了 personDF,我们就不会恢复 Person 的原始 RDD class (RDD[Person]).

示例:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]

数据集API

Dataset API is an extension to DataFrames that provides a type-safe, object-oriented programming interface. It is a strongly-typed, immutable collection of objects that are mapped to a relational schema.

At the core of the Dataset, API is a new concept called an encoder, which is responsible for converting between JVM objects and tabular representation. The tabular representation is stored using Spark internal Tungsten binary format, allowing for operations on serialized data and improved memory utilization. Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.

数据集特征:-

  • 提供 RDD 和 Dataframe 的最佳选择: RDD(函数式编程、类型安全)、DataFrame(关系模型、查询优化、Tungsten 执行、排序和改组)

  • 编码器: 通过使用编码器,可以轻松地将任何 JVM 对象转换为数据集,从而允许用户使用结构化和非结构化数据,这与 Dataframe 不同。

  • 支持的编程语言: 数据集 API 目前仅在 Scala 和 Java 中可用。 Python 和 R 目前在 1.6 版本中不支持。 Python 计划支持 2.0 版。

  • 类型安全: Datasets API 提供了编译时安全性,这在 Dataframes 中是不可用的。在下面的示例中,我们可以看到 Dataset 如何使用编译 lambda 函数对域对象进行操作。

示例:

case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
 // error : value salary is not a member of person
ds.rdd // returns RDD[Person]
  • 可互操作:数据集允许您轻松地将现有的 RDD 和数据帧转换为数据集,而无需样板代码。

数据集API限制:-

  • 需要类型转换为字符串: 从数据集中查询数据目前需要我们将 class 中的字段指定为字符串。一旦我们查询了数据,我们就被迫将列转换为所需的数据类型。另一方面,如果我们对 Datasets 使用 map 操作,它不会使用 Catalyst 优化器。

示例:

ds.select(col("name").as[String], $"age".as[Int]).collect()

不支持 Python 和 R:从 1.6 版开始,数据集仅支持 Scala 和 Java。 Python 将在 Spark 2.0 中引入支持。

Datasets API 与现有的 RDD 和 Dataframe API 相比有几个优势,具有更好的类型安全性和功能 programming.With [=237= 中类型转换要求的挑战], 你仍然没有所需的类型安全并且会使你的代码变得脆弱。

Dataframe 是 Row 对象的 RDD,每个对象代表一条记录。一种 Dataframe 也知道其行的架构(即数据字段)。而数据框 看起来像常规的 RDD,在内部它们以更有效的方式存储数据,利用它们的模式。此外,它们提供了 RDD 上不可用的新操作,例如 运行 SQL 查询的能力。 Dataframes 可以从外部数据源、查询结果或常规 RDD 创建。

参考文献:Zaharia M., et al. Learning Spark(O'Reilly,2015 年)

大部分答案都是正确的只想在这里加一分

在 Spark 2.0 中,两个 APIs (DataFrame +DataSet) 将统一为一个 API.

"Unifying DataFrame and Dataset: In Scala and Java, DataFrame and Dataset have been unified, i.e. DataFrame is just a type alias for Dataset of Row. In Python and R, given the lack of type safety, DataFrame is the main programming interface."

数据集类似于 RDD,但是,它们不使用 Java 序列化或 Kryo,而是使用专门的编码器来序列化对象,以便通过网络进行处理或传输。

Spark SQL 支持两种不同的方法将现有 RDD 转换为数据集。第一种方法使用反射来推断包含特定类型对象的 RDD 的模式。这种基于反射的方法可以生成更简洁的代码,并且当您在编写 Spark 应用程序时已经知道架构时效果很好。

第二种创建数据集的方法是通过一个编程接口,它允许您构建一个模式,然后将其应用于现有的 RDD。虽然此方法更冗长,但它允许您在列及其类型直到运行时才知道的情况下构造数据集。

在这里你可以找到RDD tof Data frame对话答案

所有(RDD、DataFrame 和 DataSet)在一张图片中。

image credits

RDD

RDD is a fault-tolerant collection of elements that can be operated on in parallel.

DataFrame

DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

Dataset

Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.


Note:

Dataset of Rows (Dataset[Row]) in Scala/Java will often refer as DataFrames.


所有这些都与代码片段进行了很好的比较。

source


Q: Can you convert one to the other like RDD to DataFrame or vice-versa?

是的,两者都可以

1. RDDDataFrame.toDF()

val rowsRdd: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")

df.show()
+------+----+----+
|    id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+

更多方式:

2。 DataFrame/DataSetRDD 使用 .rdd() 方法

val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD

从使用角度来看,RDD 与 DataFrame 的一些见解:

  1. RDD 太棒了!因为它们为我们提供了处理几乎任何类型数据的所有灵活性;非结构化、半结构化和结构化数据。由于很多时候数据还没有准备好放入 DataFrame(甚至 JSON),RDD 可用于对数据进行预处理,使其适合 DataFrame。 RDD 是 Spark 中的核心数据抽象。
  2. 并非所有在 RDD 上可能的转换都在 DataFrame 上可行,例如 subtract() 用于 RDD vs except() 用于 DataFrame。
  3. 由于 DataFrame 就像一个关系 table,它们在使用 set/relational 理论转换时遵循严格的规则,例如,如果你想合并两个数据帧,则要求两个 dfs 具有相同数量的列和关联的列数据类型。列名可以不同。这些规则不适用于 RDD。 Here is a good tutorial 解释这些事实。
  4. 正如其他人已经深入解释的那样,使用 DataFrame 可以提高性能。
  5. 使用 DataFrame,您不需要像使用 RDD 编程时那样传递任意函数。
  6. 你需要 SQLContext/HiveContext 来编程数据帧,因为它们位于 spark 生态系统的 SparkSQL 区域,但对于 RDD,你只需要 SparkContext/JavaSparkContext,它位于 Spark Core 库中。
  7. 如果可以为 RDD 定义模式,则可以从 RDD 创建 df。
  8. 您还可以将 df 转换为 rdd,将 rdd 转换为 df。

希望对您有所帮助!

A DataFrame 是一个具有模式的 RDD。您可以将其视为关系数据库 table,因为每一列都有一个名称和一个已知类型。 DataFrames 的强大之处在于,当您从结构化数据集(Json、Parquet..)创建 DataFrame 时,Spark 能够通过以下方式推断模式遍历正在加载的整个 (Json, Parquet..) 数据集。然后,在计算执行计划时,Spark 可以使用该模式并进行更好的计算优化。 请注意,DataFrame 在 Spark v1.3.0

之前被称为 SchemaRDD

Spark RDD (resilient distributed dataset) :

RDD 是核心数据抽象API,自 Spark(Spark 1.0)首次发布以来就可用。它是一个 lower-level API 用于操作分布式数据集合。 RDD APIs 公开了一些非常有用的方法,可用于对底层物理数据结构进行非常严格的控制。它是分布在不同机器上的分区数据的 immutable(只读)集合。 RDD 支持在大型集群上进行 in-memory 计算,以容错方式加速大数据处理。 为了实现容错,RDD 使用由一组顶点和边组成的 DAG(有向无环图)。 DAG 中的顶点和边分别表示 RDD 和要应用于该 RDD 的操作。 RDD 上定义的转换是惰性的,仅在调用操作时执行

Spark DataFrame :

Spark 1.3 引入了两个新的数据抽象 APIs – DataFrame 和 DataSet。 DataFrame APIs 将数据组织到命名列中,就像关系数据库中的 table 一样。它使程序员能够在分布式数据集合上定义模式。 DataFrame 中的每一行都是对象类型行。与 SQL table 一样,每个列在 DataFrame 中的行数必须相同。简而言之,DataFrame 是惰性评估计划,它指定需要对分布式数据集合执行的操作。 DataFrame也是一个immutable集合

Spark DataSet :

作为DataFrame APIs的扩展,Spark 1.3还引入了DataSet APIs,在Spark中提供严格类型化和object-oriented编程接口。是immutable,type-safe分布式数据集合。与 DataFrame 一样,DataSet APIs 也使用 Catalyst 引擎来实现执行优化。 DataSet 是 DataFrame APIs.

的扩展

Other Differences -

Apache Spark – RDD、DataFrame 和 DataSet

Spark RDD

An RDD stands for Resilient Distributed Datasets. It is Read-only partition collection of records. RDD is the fundamental data structure of Spark. It allows a programmer to perform in-memory computations on large clusters in a fault-tolerant manner. Thus, speed up the task.

Spark 数据帧

Unlike an RDD, data organized into named columns. For example a table in a relational database. It is an immutable distributed collection of data. DataFrame in Spark allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction.

Spark 数据集

Datasets in Apache Spark are an extension of DataFrame API which provides type-safe, object-oriented programming interface. Dataset takes advantage of Spark’s Catalyst optimizer by exposing expressions and data fields to a query planner.

一个。 RDD (Spark1.0) —> 数据框(Spark1.3) —> 数据集(Spark1.6)

b。 RDD 让我们决定我们想怎么做,这限制了 Spark 可以对底层处理进行的优化。 dataframe/dataset 让我们决定我们想做什么,让 Spark 决定如何进行计算。

c。 RDD 作为内存中的 jvm 对象,RDD 涉及垃圾收集和 Java(或更好一点的 Kryo)序列化的开销,这在数据增长时是昂贵的。那会降低性能。

Data frame 比 RDD 有巨大的性能提升,因为它有 2 个强大的特性:

  1. 自定义内存管理(又名 Project Tungsten)
  2. 优化的执行计划(又名 Catalyst 优化器)
    Performance wise RDD -> Data Frame -> Dataset

d。数据集(Project Tungsten 和 Catalyst Optimizer)如何在 Data frame 上得分是它的一个附加功能:Encoders