PySpark 中的 Scala case class 等价物是什么?

What is the Scala case class equivalent in PySpark?

您将如何使用 and/or 在 PySpark 中实现一个等价的案例 class?

如果您转到 使用反射推断模式 部分中的 sql-programming-guide,您将看到 case class 被定义为

case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays.

为例
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

在同一部分,如果切换到 pythonpyspark,您将看到 Row被使用并定义为

Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by looking at the first row.

为例
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = sqlContext.createDataFrame(people)

所以解释的结论是Rowpyspark中可以作为case class使用=19=]

by Alex Hall 命名产品类型的实际等价物,是 namedtuple.

中建议的 Row 不同,它具有许多有用的属性:

  • 具有明确的形状,可以可靠地用于结构模式匹配:

    >>> from collections import namedtuple
    >>>
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> foobar = FooBar(42, -42)
    >>> foo, bar = foobar
    >>> foo
    42
    >>> bar
    -42
    

    对比Rows are not reliable when used with keyword arguments:

    >>> from pyspark.sql import Row
    >>>
    >>> foobar = Row(foo=42, bar=-42)
    >>> foo, bar = foobar
    >>> foo
    -42
    >>> bar
    42
    

    尽管如果使用位置参数定义:

    >>> FooBar = Row("foo", "bar")
    >>> foobar = FooBar(42, -42)
    >>> foo, bar = foobar
    >>> foo
    42
    >>> bar
    -42
    

    顺序保留。

  • 定义适当的类型

    >>> from functools import singledispatch
    >>> 
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> type(FooBar)
    <class 'type'>
    >>> isinstance(FooBar(42, -42), FooBar)
    True
    

    并且可以在需要类型处理的任何时候使用,尤其是对于单个:

    >>> Circle = namedtuple("Circle", ["x", "y", "r"])
    >>> Rectangle = namedtuple("Rectangle", ["x1", "y1", "x2", "y2"])
    >>>
    >>> @singledispatch
    ... def area(x):
    ...     raise NotImplementedError
    ... 
    ... 
    >>> @area.register(Rectangle)
    ... def _(x):
    ...     return abs(x.x1 - x.x2) * abs(x.y1 - x.y2)
    ... 
    ... 
    >>> @area.register(Circle)
    ... def _(x):
    ...     return math.pi * x.r ** 2
    ... 
    ... 
    >>>
    >>> area(Rectangle(0, 0, 4, 4))
    16
    >>> >>> area(Circle(0, 0, 4))
    50.26548245743669
    

    multiple调度:

    >>> from multipledispatch import dispatch
    >>> from numbers import Rational
    >>>
    >>> @dispatch(Rectangle, Rational)
    ... def scale(x, y):
    ...     return Rectangle(x.x1, x.y1, x.x2 * y, x.y2 * y)
    ... 
    ... 
    >>> @dispatch(Circle, Rational)
    ... def scale(x, y):
    ...     return Circle(x.x, x.y, x.r * y)
    ...
    ...
    >>> scale(Rectangle(0, 0, 4, 4), 2)
    Rectangle(x1=0, y1=0, x2=8, y2=8)
    >>> scale(Circle(0, 0, 11), 2)
    Circle(x=0, y=0, r=22)
    

    与第一个属性相结合,可用于广泛的模式匹配场景。 namedtuples 也支持标准继承和 type hints.

    Rows 不:

    >>> FooBar = Row("foo", "bar")
    >>> type(FooBar)
    <class 'pyspark.sql.types.Row'>
    >>> isinstance(FooBar(42, -42), FooBar)  # Expected failure
    Traceback (most recent call last):
    ...
    TypeError: isinstance() arg 2 must be a type or tuple of types
    >>> BarFoo = Row("bar", "foo")
    >>> isinstance(FooBar(42, -42), type(BarFoo))
    True
    >>> isinstance(BarFoo(42, -42), type(FooBar))
    True
    
  • 提供高度优化的表示。与 Row 对象不同,元组不使用 __dict__ 并且每个实例都带有字段名称。因此,初始化速度可以提高一个数量级:

    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> %timeit FooBar(42, -42)
    587 ns ± 5.28 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
    

    与不同的 Row 构造函数相比:

    >>> %timeit Row(foo=42, bar=-42)
    3.91 µs ± 7.67 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    >>> FooBar = Row("foo", "bar")
    >>> %timeit FooBar(42, -42)
    2 µs ± 25.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    

    并且显着提高内存效率(在处理大规模数据时非常重要 属性):

    >>> import sys
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> sys.getsizeof(FooBar(42, -42))
    64
    

    与同等 Row

    相比
    >>> sys.getsizeof(Row(foo=42, bar=-42))
    72
    

    最后,namedtuple:

    的属性访问速度提高了一个数量级
    >>> FooBar = namedtuple("FooBar", ["foo", "bar"])
    >>> foobar = FooBar(42, -42)
    >>> %timeit foobar.foo
    102 ns ± 1.33 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
    

    与对 Row 对象的等效操作相比:

    >>> foobar = Row(foo=42, bar=-42)
    >>> %timeit foobar.foo
    2.58 µs ± 26.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    
  • 最后但同样重要的是 namedtuples 在 Spark SQL

    中得到了正确支持
    >>> Record = namedtuple("Record", ["id", "name", "value"])
    >>> spark.createDataFrame([Record(1, "foo", 42)])
    DataFrame[id: bigint, name: string, value: bigint]
    

总结:

应该清楚 Rowactual product type 的一个非常糟糕的替代品,应该避免,除非由 Spark API.

强制执行

还应该清楚的是,pyspark.sql.Row 并不是要替换案例 class,当您考虑到这一点时,它直接等同于 org.apache.spark.sql.Row - 类型与实际产品相去甚远,并且表现得像 Seq[Any](取决于添加了名称的子class)。 Python 和 Scala 实现都是作为外部代码和内部 Spark SQL 表示之间的有用但笨拙的接口引入的。

另见:

  • 如果不提 Alberto Berti 的精彩 MacroPy developed by Li Haoyi and its port (MacroPy3):

    >>> import macropy.console
    0=[]=====> MacroPy Enabled <=====[]=0
    >>> from macropy.case_classes import macros, case
    >>> @case
    ... class FooBar(foo, bar): pass
    ... 
    >>> foobar = FooBar(42, -42)
    >>> foo, bar = foobar
    >>> foo
    42
    >>> bar
    -42
    

    它带有一组丰富的其他功能,包括但不限于高级模式匹配和简洁的 lambda 表达式语法。

  • Python dataclasses (Python 3.7+).