Spark 2.0 数据集与 DataFrame

Spark 2.0 Dataset vs DataFrame

从 spark 2.0.1 开始,我遇到了一些问题。我阅读了很多文档,但到目前为止找不到足够的答案:

  1. df.select("foo")df.select($"foo") 之间的区别在于签名。前者至少取一String,后者取零或多Columns。除此之外没有实际区别。
  2. myDataSet.map(foo.someVal) 类型检查,但由于任何 Dataset 操作使用 RDD 个对象,并且与 DataFrame 操作相比,有一个显着高架。我们来看一个简单的例子:

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    
    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    

    如您所见,此执行计划需要访问所有字段并且必须 DeserializeToObject.

  3. 没有。一般来说,其他方法不是 语法糖 并且生成明显不同的执行计划。例如:

    ds.select($"foo").explain
    
    == Physical Plan ==
    LocalTableScan [foo#117]
    

    与之前显示的计划相比,它可以直接访问列。这与其说是 API 的限制,不如说是操作语义差异的结果。

  4. How could I df.select("foo") type-safe without a map statement?

    没有这个选项。虽然类型化的列允许您将静态 Dataset 转换为另一个静态类型的 Dataset:

    ds.select($"bar".as[Int])
    

    没有类型安全。还有一些其他尝试包括类型安全优化操作,,但这个实验性 API.

  5. why should I use a UDF / UADF instead of a map

    完全由您决定。 Spark 中的每个分布式数据结构都有自己的优点和缺点(参见示例 )。

就我个人而言,我发现静态类型 Dataset 最没用:

  • 不提供与Dataset[Row]相同范围的优化(尽管它们共享存储格式和一些执行计划优化,但不能完全受益于代码生成或堆外存储) 也无法访问 DataFrame.

  • 的所有分析功能
  • 类型转换是黑盒子,有效地为优化器创建了分析障碍。例如选择(过滤器)不能被推到类型转换上:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
    
    == Physical Plan ==
    *Filter (foo#133 = 1)
    +- *Filter <function1>.apply
       +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
          +- Exchange hashpartitioning(foo#133, 200)
             +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
                +- LocalTableScan [foo#133, bar#134]
    

    相比于:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
    
    == Physical Plan ==
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
    +- Exchange hashpartitioning(foo#133, 200)
       +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
          +- *Filter (foo#133 = 1)
             +- LocalTableScan [foo#133, bar#134] 
    

    这会影响谓词下推或投影下推等功能。

  • 不像 RDDs 那样灵活,仅支持一小部分类型。

  • "Type safety" 与 EncodersDataset 使用 as 方法转换时存在争议。因为数据形状没有使用签名编码,所以编译器只能验证 Encoder.
  • 的存在

相关问题:

Spark Dataset 比 Spark Dataframe 更强大。小例子 - 您只能创建 Dataframe RowTuple 或任何原始数据类型,但 Dataset 使您能够创建任何非原始类型的 Dataset也。也就是说,您可以从字面上创建 Dataset 对象类型。

例如:

case class Employee(id:Int,name:String)

Dataset[Employee]   // is valid
Dataframe[Employee] // is invalid

DATAFRAME:DataFrame 是一种允许数据模式视图的抽象。

案例class人(姓名:字符串,年龄:整数,地址:字符串)

定义class人

scala > val df = List ( Person ( “Sumanth”, 23, “BNG”)

DATAFRAME VS DATASET

DATASET:Data Set 是 Dataframe 的扩展 API,这是最新的抽象,它试图提供 RDD 和 Dataframe 的优点。